提交 74c02505 编写于 作者: S Stephane Maldini 提交者: Rossen Stoyanchev

Upgrade to Reactor 2

Issue: SPR-12599
上级 122d3476
......@@ -53,7 +53,7 @@ configure(allprojects) { project ->
ext.nettyVersion = "4.0.26.Final"
ext.openjpaVersion = "2.2.2" // 2.3.0 not Java 8 compatible (based on ASM 4)
ext.protobufVersion = "2.6.1"
ext.reactorVersion = "1.1.6.RELEASE"
ext.reactorVersion = "2.0.1.BUILD-SNAPSHOT"
ext.slf4jVersion = "1.7.11"
ext.snakeyamlVersion = "1.15"
ext.snifferVersion = "1.14"
......@@ -117,6 +117,7 @@ configure(allprojects) { project ->
}
repositories {
mavenLocal()
maven { url "https://repo.spring.io/libs-release" }
maven { url "https://repo.spring.io/milestone" }
}
......@@ -486,16 +487,17 @@ project("spring-messaging") {
compile(project(":spring-beans"))
compile(project(":spring-core"))
compile(project(":spring-context"))
optional("org.projectreactor:reactor-core:${reactorVersion}")
optional("org.projectreactor:reactor-net:${reactorVersion}") {
optional("io.projectreactor:reactor-net:${reactorVersion}") {
exclude group: "io.netty", module: "netty-all"
}
optional "io.netty:netty-all:$nettyVersion"
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
optional("org.eclipse.jetty.websocket:websocket-client:${jettyVersion}")
optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}")
testCompile(project(":spring-test"))
testCompile('org.slf4j:slf4j-log4j12:1.7.10')
testCompile("javax.inject:javax.inject-tck:1")
testCompile("javax.servlet:javax.servlet-api:3.1.0")
testCompile("javax.validation:validation-api:1.0.0.GA")
......@@ -756,8 +758,7 @@ project("spring-websocket") {
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}")
testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:${tomcatVersion}")
testCompile("org.projectreactor:reactor-core:${reactorVersion}")
testCompile("org.projectreactor:reactor-net:${reactorVersion}")
testCompile("io.projectreactor:reactor-net:${reactorVersion}")
testCompile("log4j:log4j:1.2.17")
testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}")
}
......
......@@ -99,7 +99,7 @@ public class StreamConverter implements ConditionalGenericConverter {
}
private Object convertFromStream(Stream<?> source, TypeDescriptor streamType, TypeDescriptor targetType) {
List<Object> content = source.collect(Collectors.toList());
List<Object> content = source.collect(Collectors.<Object>toList());
TypeDescriptor listType = TypeDescriptor.collection(List.class, streamType.getElementTypeDescriptor());
return this.conversionService.convert(content, listType, targetType);
}
......
......@@ -18,13 +18,13 @@ package org.springframework.messaging.simp.stomp;
import java.nio.ByteBuffer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
/**
* A Reactor TCP {@link Codec} for sending and receiving STOMP messages.
......@@ -33,7 +33,7 @@ import org.springframework.util.Assert;
* @author Rossen Stoyanchev
* @since 4.0
*/
public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Message<byte[]>> {
public class Reactor2StompCodec extends Codec<Buffer, Message<byte[]>, Message<byte[]>> {
private final StompDecoder stompDecoder;
......@@ -42,11 +42,11 @@ public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Messa
private final Function<Message<byte[]>, Buffer> encodingFunction;
public Reactor11StompCodec() {
public Reactor2StompCodec() {
this(new StompEncoder(), new StompDecoder());
}
public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) {
public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) {
Assert.notNull(encoder, "'encoder' is required");
Assert.notNull(decoder, "'decoder' is required");
this.stompEncoder = encoder;
......@@ -64,6 +64,10 @@ public class Reactor11StompCodec implements Codec<Buffer, Message<byte[]>, Messa
return this.encodingFunction;
}
@Override
public Buffer apply(Message<byte[]> message) {
return encodingFunction.apply(message);
}
private static class EncodingFunction implements Function<Message<byte[]>, Buffer> {
......
......@@ -15,32 +15,30 @@
*/
package org.springframework.messaging.simp.stomp;
import java.util.Arrays;
import java.util.Properties;
import reactor.core.Environment;
import reactor.core.configuration.ConfigurationReader;
import reactor.core.configuration.DispatcherConfiguration;
import reactor.core.configuration.DispatcherType;
import reactor.core.configuration.ReactorConfiguration;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor11TcpClient;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.Environment;
import reactor.core.config.ConfigurationReader;
import reactor.core.config.DispatcherConfiguration;
import reactor.core.config.DispatcherType;
import reactor.core.config.ReactorConfiguration;
import reactor.fn.Function;
import reactor.io.net.Spec;
import java.util.Arrays;
import java.util.Properties;
/**
* A STOMP over TCP client that uses
* {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient
* {@link Reactor2TcpClient
* Reactor11TcpClient}.
*
* @author Rossen Stoyanchev
* @since 4.2
*/
public class Reactor11TcpStompClient extends StompClientSupport {
public class Reactor2TcpStompClient extends StompClientSupport {
private final TcpOperations<byte[]> tcpClient;
......@@ -48,32 +46,49 @@ public class Reactor11TcpStompClient extends StompClientSupport {
/**
* Create an instance with host "127.0.0.1" and port 61613.
*/
public Reactor11TcpStompClient() {
public Reactor2TcpStompClient() {
this("127.0.0.1", 61613);
}
/**
* Create an instance with the given host and port.
*
* @param host the host
* @param port the port
*/
public Reactor11TcpStompClient(String host, int port) {
this.tcpClient = new Reactor11TcpClient<byte[]>(createNettyTcpClient(host, port));
public Reactor2TcpStompClient(final String host, final int port) {
this.tcpClient = new Reactor2TcpClient<byte[]>(createNettyTcpClientFactory(host, port));
}
private TcpClient<Message<byte[]>, Message<byte[]>> createNettyTcpClient(String host, int port) {
return new TcpClientSpec<Message<byte[]>, Message<byte[]>>(NettyTcpClient.class)
.env(new Environment(new StompClientDispatcherConfigReader()))
.codec(new Reactor11StompCodec(new StompEncoder(), new StompDecoder()))
.connect(host, port)
.get();
private Function<Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>,
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>> createNettyTcpClientFactory(
final String host, final int port
) {
final Environment environment = new Environment(new StompClientDispatcherConfigReader()).assignErrorJournal();
return new Function<Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>,
Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>>>() {
@Override
public Spec.TcpClientSpec<Message<byte[]>, Message<byte[]>> apply(Spec.TcpClientSpec<Message<byte[]>,
Message<byte[]>> spec) {
return spec
.codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder()))
.env(environment)
.dispatcher(environment.getCachedDispatchers("StompClient").get())
.connect(host, port);
}
};
}
/**
* Create an instance with a pre-configured TCP client.
*
* @param tcpClient the client to use
*/
public Reactor11TcpStompClient(TcpOperations<byte[]> tcpClient) {
public Reactor2TcpStompClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
}
......@@ -81,6 +96,7 @@ public class Reactor11TcpStompClient extends StompClientSupport {
/**
* Connect and notify the given {@link StompSessionHandler} when connected
* on the STOMP level,
*
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
......@@ -91,8 +107,9 @@ public class Reactor11TcpStompClient extends StompClientSupport {
/**
* An overloaded version of {@link #connect(StompSessionHandler)} that
* accepts headers to use for the STOMP CONNECT frame.
*
* @param connectHeaders headers to add to the CONNECT frame
* @param handler the handler for the STOMP session
* @param handler the handler for the STOMP session
* @return ListenableFuture for access to the session when ready for use
*/
public ListenableFuture<StompSession> connect(StompHeaders connectHeaders, StompSessionHandler handler) {
......@@ -117,9 +134,10 @@ public class Reactor11TcpStompClient extends StompClientSupport {
@Override
public ReactorConfiguration read() {
String dispatcherName = "StompClient";
DispatcherType dispatcherType = DispatcherType.THREAD_POOL_EXECUTOR;
DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP;
DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0);
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(config), dispatcherName, new Properties());
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(config), dispatcherName, new Properties
());
}
}
......
......@@ -40,7 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.messaging.tcp.reactor.Reactor11TcpClient;
import org.springframework.messaging.tcp.reactor.Reactor2TcpClient;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
......@@ -327,7 +327,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
/**
* Configure a TCP client for managing TCP connections to the STOMP broker.
* By default {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient} is used.
* By default {@link Reactor2TcpClient} is used.
*/
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
this.tcpClient = tcpClient;
......@@ -379,7 +379,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (this.tcpClient == null) {
StompDecoder decoder = new StompDecoder();
decoder.setHeaderInitializer(getHeaderInitializer());
Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder);
Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder);
this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec);
}
......@@ -956,8 +956,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private static class StompTcpClientFactory {
public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor11StompCodec codec) {
return new Reactor11TcpClient<byte[]>(relayHost, relayPort, codec);
public TcpOperations<byte[]> create(String relayHost, int relayPort, Reactor2StompCodec codec) {
return new Reactor2TcpClient<byte[]>(relayHost, relayPort, codec);
}
}
......
......@@ -20,15 +20,14 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import reactor.core.composable.Promise;
import reactor.function.Consumer;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.ListenableFutureCallbackRegistry;
import org.springframework.util.concurrent.SuccessCallback;
import reactor.fn.Consumer;
import reactor.rx.Promise;
/**
* Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting
......@@ -55,8 +54,7 @@ abstract class AbstractPromiseToListenableFutureAdapter<S, T> implements Listena
public void accept(S result) {
try {
registry.success(adapt(result));
}
catch (Throwable t) {
} catch (Throwable t) {
registry.failure(t);
}
}
......
......@@ -16,7 +16,8 @@
package org.springframework.messaging.tcp.reactor;
import reactor.core.composable.Promise;
import reactor.rx.Promise;
/**
* A Promise-to-ListenableFutureAdapter where the source and the target from the Promise and
......
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
import reactor.core.Environment;
import reactor.core.composable.Composable;
import reactor.core.composable.Promise;
import reactor.core.composable.Stream;
import reactor.core.composable.spec.Promises;
import reactor.core.configuration.ConfigurationReader;
import reactor.core.configuration.DispatcherConfiguration;
import reactor.core.configuration.ReactorConfiguration;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import reactor.io.encoding.Codec;
import reactor.net.NetChannel;
import reactor.net.Reconnect;
import reactor.net.netty.tcp.NettyTcpClient;
import reactor.net.tcp.TcpClient;
import reactor.net.tcp.spec.TcpClientSpec;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
public class Reactor11TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings("rawtypes")
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final TcpClient<Message<P>, Message<P>> tcpClient;
private final Environment environment;
/**
* A constructor that creates a {@link reactor.net.netty.tcp.NettyTcpClient} with
* a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which
* network I/O is handled in Netty threads.
*
* <p>Also see the constructor accepting a pre-configured Reactor
* {@link reactor.net.tcp.TcpClient}.
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public Reactor11TcpClient(String host, int port, Codec<Buffer, Message<P>, Message<P>> codec) {
// Revisit in 1.1: is Environment still required w/ sync dispatcher?
this.environment = new Environment(new SynchronousDispatcherConfigReader());
this.tcpClient = new TcpClientSpec<Message<P>, Message<P>>(REACTOR_TCP_CLIENT_TYPE)
.env(this.environment)
.codec(codec)
.connect(host, port)
.get();
}
/**
* A constructor with a pre-configured {@link reactor.net.tcp.TcpClient}.
*
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for shutting down the {@link reactor.core.Environment}
* instance with which the client is configured.
*
* @param tcpClient the TcpClient to use
*/
public Reactor11TcpClient(TcpClient<Message<P>, Message<P>> tcpClient) {
Assert.notNull(tcpClient, "'tcpClient' must not be null");
this.tcpClient = tcpClient;
this.environment = null;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
Promise<NetChannel<Message<P>, Message<P>>> promise = this.tcpClient.open();
composeConnectionHandling(promise, connectionHandler);
return new AbstractPromiseToListenableFutureAdapter<NetChannel<Message<P>, Message<P>>, Void>(promise) {
@Override
protected Void adapt(NetChannel<Message<P>, Message<P>> result) {
return null;
}
};
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler,
final ReconnectStrategy reconnectStrategy) {
Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
Reconnect reconnect = new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
}
};
Stream<NetChannel<Message<P>, Message<P>>> stream = this.tcpClient.open(reconnect);
composeConnectionHandling(stream, connectionHandler);
Promise<Void> promise = Promises.next(stream).map(
new Function<NetChannel<Message<P>, Message<P>>, Void>() {
@Override
public Void apply(NetChannel<Message<P>, Message<P>> ch) {
return null;
}
});
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}
private void composeConnectionHandling(Composable<NetChannel<Message<P>, Message<P>>> composable,
final TcpConnectionHandler<P> connectionHandler) {
composable
.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable ex) {
connectionHandler.afterConnectFailure(ex);
}
})
.consume(new Consumer<NetChannel<Message<P>, Message<P>>>() {
@Override
public void accept(NetChannel<Message<P>, Message<P>> connection) {
connection
.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
connectionHandler.handleFailure(t);
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
})
.on()
.close(new Runnable() {
@Override
public void run() {
connectionHandler.afterConnectionClosed();
}
});
connectionHandler.afterConnected(new Reactor11TcpConnection<P>(connection));
}
});
}
@Override
public ListenableFuture<Boolean> shutdown() {
try {
Promise<Boolean> promise = this.tcpClient.close();
return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(promise) {
@Override
protected Boolean adapt(Boolean result) {
return result;
}
};
}
finally {
if (this.environment != null) {
this.environment.shutdown();
}
}
}
/**
* A ConfigurationReader that enforces the use of a SynchronousDispatcher.
*
* <p>The {@link reactor.core.configuration.PropertiesConfigurationReader} used by
* default automatically creates other dispatchers with thread pools that are
* not needed.
*/
private static class SynchronousDispatcherConfigReader implements ConfigurationReader {
@Override
public ReactorConfiguration read() {
return new ReactorConfiguration(Arrays.<DispatcherConfiguration>asList(), "sync", new Properties());
}
}
}
\ No newline at end of file
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.tcp.reactor;
import io.netty.channel.nio.NioEventLoopGroup;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.ReconnectStrategy;
import org.springframework.messaging.tcp.TcpConnectionHandler;
import org.springframework.messaging.tcp.TcpOperations;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.fn.BiFunction;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.NetStreams;
import reactor.io.net.Reconnect;
import reactor.io.net.Spec;
import reactor.io.net.impl.netty.NettyClientSocketOptions;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.io.net.tcp.TcpClient;
import reactor.rx.Promise;
import reactor.rx.Stream;
import reactor.rx.Streams;
import reactor.rx.action.Signal;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpOperations}
* based on the TCP client support of the Reactor project.
* <p/>
* This client will wrap N number of clients for N {@link #connect} calls (one client by connection).
*
* @author Rossen Stoyanchev
* @author Stephane Maldini
* @since 4.2
*/
public class Reactor2TcpClient<P> implements TcpOperations<P> {
@SuppressWarnings("rawtypes")
public static final Class<NettyTcpClient> REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class;
private final Function<Spec.TcpClientSpec<Message<P>, Message<P>>,
Spec.TcpClientSpec<Message<P>, Message<P>>> tcpClientSpec;
private final List<TcpClient<Message<P>, Message<P>>> activeClients =
new ArrayList<TcpClient<Message<P>, Message<P>>>();
/**
* A constructor that creates a {@link reactor.io.net.Spec.TcpClientSpec} factory with
* a default {@link reactor.core.dispatch.SynchronousDispatcher} as a result of which
* network I/O is handled in Netty threads. Number of Netty threads can be tweaked with
* the {@code reactor.tcp.ioThreadCount} System property.
* <p>
* The network I/O threads will be shared amongst the active clients.
* </p>
* <p/>
* <p>Also see the constructor accepting a ready Reactor
* {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory.
*
* @param host the host to connect to
* @param port the port to connect to
* @param codec the codec to use for encoding and decoding the TCP stream
*/
public Reactor2TcpClient(final String host, final int port, final Codec<Buffer, Message<P>, Message<P>> codec) {
//FIXME Should it be exposed in Spring ?
int ioThreadCount;
try {
ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount"));
} catch (Exception i) {
ioThreadCount = -1;
}
if (ioThreadCount <= 0l) {
ioThreadCount = Runtime.getRuntime().availableProcessors();
}
final NioEventLoopGroup eventLoopGroup =
new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io"));
this.tcpClientSpec = new Function<Spec.TcpClientSpec<Message<P>, Message<P>>,
Spec.TcpClientSpec<Message<P>, Message<P>>>() {
@Override
public Spec.TcpClientSpec<Message<P>, Message<P>> apply(Spec.TcpClientSpec<Message<P>, Message<P>>
messageMessageTcpClientSpec) {
return messageMessageTcpClientSpec
.codec(codec)
//make connect dynamic or use reconnect strategy to LB onto cluster
.connect(host, port)
.options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup));
}
};
}
/**
* A constructor with a pre-configured {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory.
* This might be used to add SSL or specific network parameters to the generated client configuration.
* <p/>
* <p><strong>NOTE:</strong> if the client is configured with a thread-creating
* dispatcher, you are responsible for cleaning them, e.g. using {@link reactor.core.Dispatcher#shutdown}.
*
* @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation.
*/
public Reactor2TcpClient(Function<Spec.TcpClientSpec<Message<P>, Message<P>>,
Spec.TcpClientSpec<Message<P>, Message<P>>> tcpClientSpecFactory) {
Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null");
this.tcpClientSpec = tcpClientSpecFactory;
}
@Override
public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler) {
//create a new client
TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec);
//attach connection handler
composeConnectionHandling(tcpClient, connectionHandler);
//open connection
Promise<Boolean> promise = tcpClient.open();
//adapt to ListenableFuture
return new AbstractPromiseToListenableFutureAdapter<Boolean, Void>(promise) {
@Override
protected Void adapt(Boolean result) {
return null;
}
};
}
@Override
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler,
final ReconnectStrategy reconnectStrategy) {
Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null");
Reconnect reconnect = new Reconnect() {
@Override
public Tuple2<InetSocketAddress, Long> reconnect(InetSocketAddress address, int attempt) {
return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt));
}
};
//create a new client
TcpClient<Message<P>, Message<P>> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec);
//attach connection handler
composeConnectionHandling(tcpClient, connectionHandler);
//open connection
Stream<Boolean> stream = tcpClient.open(reconnect);
//adapt to ListenableFuture with the next available connection
Promise<Void> promise = stream.next().map(
new Function<Boolean, Void>() {
@Override
public Void apply(Boolean ch) {
return null;
}
});
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
}
private void composeConnectionHandling(final TcpClient<Message<P>, Message<P>> tcpClient,
final TcpConnectionHandler<P> connectionHandler) {
synchronized (activeClients){
activeClients.add(tcpClient);
}
tcpClient
.finallyDo(new Consumer<Signal<ChannelStream<Message<P>,Message<P>>>>() {
@Override
public void accept(Signal<ChannelStream<Message<P>,Message<P>>> signal) {
synchronized (activeClients) {
activeClients.remove(tcpClient);
}
if(signal.isOnError()) {
connectionHandler.afterConnectFailure(signal.getThrowable());
}
}
})
.log("reactor.client")
.consume(new Consumer<ChannelStream<Message<P>, Message<P>>>() {
@Override
public void accept(ChannelStream<Message<P>, Message<P>> connection) {
connection
.log("reactor.connection")
.finallyDo(new Consumer<Signal<Message<P>>>() {
@Override
public void accept(Signal<Message<P>> signal) {
if (signal.isOnError()) {
connectionHandler.handleFailure(signal.getThrowable());
} else if (signal.isOnComplete()) {
connectionHandler.afterConnectionClosed();
}
}
})
.consume(new Consumer<Message<P>>() {
@Override
public void accept(Message<P> message) {
connectionHandler.handleMessage(message);
}
});
connectionHandler.afterConnected(new Reactor2TcpConnection<P>(connection));
}
});
}
@Override
public ListenableFuture<Boolean> shutdown() {
final List<TcpClient<Message<P>, Message<P>>> clients;
synchronized (activeClients){
clients = new ArrayList<TcpClient<Message<P>, Message<P>>>(activeClients);
//should be cleared individually in tcpClient#finallyDo()
//activeClients.clear();
}
Promise<Boolean> promise = Streams.from(clients)
.flatMap(new Function<TcpClient<Message<P>, Message<P>>, Promise<Boolean>>() {
@Override
public Promise<Boolean> apply(TcpClient<Message<P>, Message<P>> client) {
return client.close();
}
})
.reduce(new BiFunction<Boolean, Boolean, Boolean>() {
@Override
public Boolean apply(Boolean prev, Boolean next) {
return prev && next;
}
})
.next();
return new AbstractPromiseToListenableFutureAdapter<Boolean, Boolean>(promise) {
@Override
protected Boolean adapt(Boolean result) {
return result;
}
};
}
}
\ No newline at end of file
......@@ -16,12 +16,20 @@
package org.springframework.messaging.tcp.reactor;
import reactor.core.composable.Promise;
import reactor.net.NetChannel;
import org.reactivestreams.Publisher;
import org.springframework.messaging.Message;
import org.springframework.messaging.tcp.TcpConnection;
import org.springframework.util.concurrent.ListenableFuture;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.fn.Functions;
import reactor.io.net.ChannelStream;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.broadcast.Broadcaster;
import java.lang.reflect.Constructor;
/**
* An implementation of {@link org.springframework.messaging.tcp.TcpConnection}
......@@ -32,35 +40,40 @@ import org.springframework.util.concurrent.ListenableFuture;
* @param <P> the payload type of Spring Message's read from and written to
* the TCP stream
*/
public class Reactor11TcpConnection<P> implements TcpConnection<P> {
public class Reactor2TcpConnection<P> implements TcpConnection<P> {
private final NetChannel<Message<P>, Message<P>> channel;
private final ChannelStream<Message<P>, Message<P>> channel;
private final Broadcaster<Message<P>> sink;
public Reactor11TcpConnection(NetChannel<Message<P>, Message<P>> connection) {
public Reactor2TcpConnection(ChannelStream<Message<P>, Message<P>> connection) {
this.channel = connection;
this.sink = Broadcaster.create();
channel.sink(sink);
}
@Override
public ListenableFuture<Void> send(Message<P> message) {
Promise<Void> promise = this.channel.send(message);
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
sink.onNext(message);
//FIXME need to align Reactor with Reactive IPC to have publish/confirm receipt
return new PassThroughPromiseToListenableFutureAdapter<Void>(Promises.<Void>success(null));
}
@Override
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.channel.on().readIdle(inactivityDuration, runnable);
this.channel.on().readIdle(inactivityDuration, Functions.<Void>consumer(runnable));
}
@Override
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.channel.on().writeIdle(inactivityDuration, runnable);
this.channel.on().writeIdle(inactivityDuration, Functions.<Void>consumer(runnable));
}
@Override
public void close() {
this.channel.close();
sink.onComplete();
}
}
......@@ -42,20 +42,20 @@ import org.springframework.util.SocketUtils;
import org.springframework.util.concurrent.ListenableFuture;
/**
* Integration tests for {@link Reactor11TcpStompClient}.
* Integration tests for {@link Reactor2TcpStompClient}.
*
* @author Rossen Stoyanchev
*/
public class Reactor11TcpStompClientTests {
public class Reactor2TcpStompClientTests {
private static final Log logger = LogFactory.getLog(Reactor11TcpStompClientTests.class);
private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class);
@Rule
public final TestName testName = new TestName();
private BrokerService activeMQBroker;
private Reactor11TcpStompClient client;
private Reactor2TcpStompClient client;
@Before
......@@ -77,7 +77,7 @@ public class Reactor11TcpStompClientTests {
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
this.client = new Reactor11TcpStompClient("127.0.0.1", port);
this.client = new Reactor2TcpStompClient("127.0.0.1", port);
this.client.setMessageConverter(new StringMessageConverter());
this.client.setTaskScheduler(taskScheduler);
}
......@@ -86,8 +86,7 @@ public class Reactor11TcpStompClientTests {
public void tearDown() throws Exception {
try {
this.client.shutdown();
}
catch (Throwable ex) {
} catch (Throwable ex) {
logger.error("Failed to shut client", ex);
}
final CountDownLatch latch = new CountDownLatch(1);
......
......@@ -21,19 +21,19 @@ import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.io.Buffer;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.InvalidMimeTypeException;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import static org.junit.Assert.*;
/**
* Test fixture for {@link Reactor11StompCodec}.
* Test fixture for {@link Reactor2StompCodec}.
*
* @author Andy Wilkinson
*/
......@@ -41,7 +41,7 @@ public class StompCodecTests {
private final ArgumentCapturingConsumer<Message<byte[]>> consumer = new ArgumentCapturingConsumer<Message<byte[]>>();
private final Function<Buffer, Message<byte[]>> decoder = new Reactor11StompCodec().decoder(consumer);
private final Function<Buffer, Message<byte[]>> decoder = new Reactor2StompCodec().decoder(consumer);
@Test
public void decodeFrameWithCrLfEols() {
......@@ -176,7 +176,7 @@ public class StompCodecTests {
Buffer buffer = Buffer.wrap(frame1 + frame2);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
new Reactor2StompCodec().decoder(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
messages.add(message);
......@@ -234,7 +234,7 @@ public class StompCodecTests {
Buffer buffer = Buffer.wrap(frame);
final List<Message<byte[]>> messages = new ArrayList<Message<byte[]>>();
new Reactor11StompCodec().decoder(new Consumer<Message<byte[]>>() {
new Reactor2StompCodec().decoder(new Consumer<Message<byte[]>>() {
@Override
public void accept(Message<byte[]> message) {
messages.add(message);
......@@ -251,7 +251,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString());
assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
......@@ -262,7 +262,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
String frameString = new Reactor11StompCodec().encoder().apply(frame).asString();
String frameString = new Reactor2StompCodec().encoder().apply(frame).asString();
assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") ||
frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0"));
......@@ -276,7 +276,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0",
new Reactor11StompCodec().encoder().apply(frame).asString());
new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
......@@ -287,7 +287,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0",
new Reactor11StompCodec().encoder().apply(frame).asString());
new Reactor2StompCodec().encoder().apply(frame).asString());
}
@Test
......@@ -298,7 +298,7 @@ public class StompCodecTests {
Message<byte[]> frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders());
assertEquals("SEND\ncontent-length:12\n\nMessage body\0",
new Reactor11StompCodec().encoder().apply(frame).asString());
new Reactor2StompCodec().encoder().apply(frame).asString());
}
private void assertIncompleteDecode(String partialFrame) {
......
......@@ -4,10 +4,10 @@ log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%c] - %m%n
log4j.rootCategory=WARN, console
log4j.logger.org.springframework.messaging=DEBUG
log4j.logger.org.apache.activemq=TRACE
log4j.logger.org.apache.activemq=INFO
# Enable TRACE level to chase integration test issues on CI servers
log4j.logger.org.springframework.messaging.simp.stomp=TRACE
log4j.logger.reactor.net=TRACE
log4j.logger.io.netty=TRACE
log4j.logger.reactor=DEBUG
log4j.logger.io.netty=INFO
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册