diff --git a/build.gradle b/build.gradle index c969e8339462561a69d68dde7ff3ca2914982257..8b5239dea1a2e1350d6f887464281dac77154e76 100644 --- a/build.gradle +++ b/build.gradle @@ -53,7 +53,7 @@ configure(allprojects) { project -> ext.nettyVersion = "4.0.26.Final" ext.openjpaVersion = "2.2.2" // 2.3.0 not Java 8 compatible (based on ASM 4) ext.protobufVersion = "2.6.1" - ext.reactorVersion = "1.1.6.RELEASE" + ext.reactorVersion = "2.0.1.BUILD-SNAPSHOT" ext.slf4jVersion = "1.7.11" ext.snakeyamlVersion = "1.15" ext.snifferVersion = "1.14" @@ -117,6 +117,7 @@ configure(allprojects) { project -> } repositories { + mavenLocal() maven { url "https://repo.spring.io/libs-release" } maven { url "https://repo.spring.io/milestone" } } @@ -486,16 +487,17 @@ project("spring-messaging") { compile(project(":spring-beans")) compile(project(":spring-core")) compile(project(":spring-context")) - optional("org.projectreactor:reactor-core:${reactorVersion}") - optional("org.projectreactor:reactor-net:${reactorVersion}") { + optional("io.projectreactor:reactor-net:${reactorVersion}") { exclude group: "io.netty", module: "netty-all" } + optional "io.netty:netty-all:$nettyVersion" optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") { exclude group: "javax.servlet", module: "javax.servlet-api" } optional("org.eclipse.jetty.websocket:websocket-client:${jettyVersion}") optional("com.fasterxml.jackson.core:jackson-databind:${jackson2Version}") testCompile(project(":spring-test")) + testCompile('org.slf4j:slf4j-log4j12:1.7.10') testCompile("javax.inject:javax.inject-tck:1") testCompile("javax.servlet:javax.servlet-api:3.1.0") testCompile("javax.validation:validation-api:1.0.0.GA") @@ -756,8 +758,7 @@ project("spring-websocket") { testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-websocket:${tomcatVersion}") testCompile("org.apache.tomcat.embed:tomcat-embed-logging-juli:${tomcatVersion}") - testCompile("org.projectreactor:reactor-core:${reactorVersion}") - testCompile("org.projectreactor:reactor-net:${reactorVersion}") + testCompile("io.projectreactor:reactor-net:${reactorVersion}") testCompile("log4j:log4j:1.2.17") testCompile("org.slf4j:slf4j-jcl:${slf4jVersion}") } diff --git a/spring-core/src/main/java/org/springframework/core/convert/support/StreamConverter.java b/spring-core/src/main/java/org/springframework/core/convert/support/StreamConverter.java index 425c11576c8c8da06e523b43f557fb732d8e284f..06a8818995998a10a552d2da5dcf034874acfe15 100644 --- a/spring-core/src/main/java/org/springframework/core/convert/support/StreamConverter.java +++ b/spring-core/src/main/java/org/springframework/core/convert/support/StreamConverter.java @@ -99,7 +99,7 @@ public class StreamConverter implements ConditionalGenericConverter { } private Object convertFromStream(Stream source, TypeDescriptor streamType, TypeDescriptor targetType) { - List content = source.collect(Collectors.toList()); + List content = source.collect(Collectors.toList()); TypeDescriptor listType = TypeDescriptor.collection(List.class, streamType.getElementTypeDescriptor()); return this.conversionService.convert(content, listType, targetType); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java similarity index 86% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java index e6bc334bb4743d3455e344432d974f6ab529bc62..4ee2074da0348c11fe6ab226a6165f6a9031d481 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11StompCodec.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2StompCodec.java @@ -18,13 +18,13 @@ package org.springframework.messaging.simp.stomp; import java.nio.ByteBuffer; -import reactor.function.Consumer; -import reactor.function.Function; -import reactor.io.Buffer; -import reactor.io.encoding.Codec; import org.springframework.messaging.Message; import org.springframework.util.Assert; +import reactor.fn.Consumer; +import reactor.fn.Function; +import reactor.io.buffer.Buffer; +import reactor.io.codec.Codec; /** * A Reactor TCP {@link Codec} for sending and receiving STOMP messages. @@ -33,7 +33,7 @@ import org.springframework.util.Assert; * @author Rossen Stoyanchev * @since 4.0 */ -public class Reactor11StompCodec implements Codec, Message> { +public class Reactor2StompCodec extends Codec, Message> { private final StompDecoder stompDecoder; @@ -42,11 +42,11 @@ public class Reactor11StompCodec implements Codec, Messa private final Function, Buffer> encodingFunction; - public Reactor11StompCodec() { + public Reactor2StompCodec() { this(new StompEncoder(), new StompDecoder()); } - public Reactor11StompCodec(StompEncoder encoder, StompDecoder decoder) { + public Reactor2StompCodec(StompEncoder encoder, StompDecoder decoder) { Assert.notNull(encoder, "'encoder' is required"); Assert.notNull(decoder, "'decoder' is required"); this.stompEncoder = encoder; @@ -64,6 +64,10 @@ public class Reactor11StompCodec implements Codec, Messa return this.encodingFunction; } + @Override + public Buffer apply(Message message) { + return encodingFunction.apply(message); + } private static class EncodingFunction implements Function, Buffer> { diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClient.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java similarity index 62% rename from spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClient.java rename to spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java index 28962e0176e41f538091817a19d529a700295246..59abaa32a427fa4330e35ddfa81e1dde56850ae4 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClient.java @@ -15,32 +15,30 @@ */ package org.springframework.messaging.simp.stomp; -import java.util.Arrays; -import java.util.Properties; - -import reactor.core.Environment; -import reactor.core.configuration.ConfigurationReader; -import reactor.core.configuration.DispatcherConfiguration; -import reactor.core.configuration.DispatcherType; -import reactor.core.configuration.ReactorConfiguration; -import reactor.net.netty.tcp.NettyTcpClient; -import reactor.net.tcp.TcpClient; -import reactor.net.tcp.spec.TcpClientSpec; - import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor11TcpClient; +import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.util.concurrent.ListenableFuture; +import reactor.Environment; +import reactor.core.config.ConfigurationReader; +import reactor.core.config.DispatcherConfiguration; +import reactor.core.config.DispatcherType; +import reactor.core.config.ReactorConfiguration; +import reactor.fn.Function; +import reactor.io.net.Spec; + +import java.util.Arrays; +import java.util.Properties; /** * A STOMP over TCP client that uses - * {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient + * {@link Reactor2TcpClient * Reactor11TcpClient}. * * @author Rossen Stoyanchev * @since 4.2 */ -public class Reactor11TcpStompClient extends StompClientSupport { +public class Reactor2TcpStompClient extends StompClientSupport { private final TcpOperations tcpClient; @@ -48,32 +46,49 @@ public class Reactor11TcpStompClient extends StompClientSupport { /** * Create an instance with host "127.0.0.1" and port 61613. */ - public Reactor11TcpStompClient() { + public Reactor2TcpStompClient() { this("127.0.0.1", 61613); } /** * Create an instance with the given host and port. + * * @param host the host * @param port the port */ - public Reactor11TcpStompClient(String host, int port) { - this.tcpClient = new Reactor11TcpClient(createNettyTcpClient(host, port)); + public Reactor2TcpStompClient(final String host, final int port) { + this.tcpClient = new Reactor2TcpClient(createNettyTcpClientFactory(host, port)); } - private TcpClient, Message> createNettyTcpClient(String host, int port) { - return new TcpClientSpec, Message>(NettyTcpClient.class) - .env(new Environment(new StompClientDispatcherConfigReader())) - .codec(new Reactor11StompCodec(new StompEncoder(), new StompDecoder())) - .connect(host, port) - .get(); + private Function, Message>, + Spec.TcpClientSpec, Message>> createNettyTcpClientFactory( + final String host, final int port + ) { + + final Environment environment = new Environment(new StompClientDispatcherConfigReader()).assignErrorJournal(); + + return new Function, Message>, + Spec.TcpClientSpec, Message>>() { + + @Override + public Spec.TcpClientSpec, Message> apply(Spec.TcpClientSpec, + Message> spec) { + + return spec + .codec(new Reactor2StompCodec(new StompEncoder(), new StompDecoder())) + .env(environment) + .dispatcher(environment.getCachedDispatchers("StompClient").get()) + .connect(host, port); + } + }; } /** * Create an instance with a pre-configured TCP client. + * * @param tcpClient the client to use */ - public Reactor11TcpStompClient(TcpOperations tcpClient) { + public Reactor2TcpStompClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; } @@ -81,6 +96,7 @@ public class Reactor11TcpStompClient extends StompClientSupport { /** * Connect and notify the given {@link StompSessionHandler} when connected * on the STOMP level, + * * @param handler the handler for the STOMP session * @return ListenableFuture for access to the session when ready for use */ @@ -91,8 +107,9 @@ public class Reactor11TcpStompClient extends StompClientSupport { /** * An overloaded version of {@link #connect(StompSessionHandler)} that * accepts headers to use for the STOMP CONNECT frame. + * * @param connectHeaders headers to add to the CONNECT frame - * @param handler the handler for the STOMP session + * @param handler the handler for the STOMP session * @return ListenableFuture for access to the session when ready for use */ public ListenableFuture connect(StompHeaders connectHeaders, StompSessionHandler handler) { @@ -117,9 +134,10 @@ public class Reactor11TcpStompClient extends StompClientSupport { @Override public ReactorConfiguration read() { String dispatcherName = "StompClient"; - DispatcherType dispatcherType = DispatcherType.THREAD_POOL_EXECUTOR; + DispatcherType dispatcherType = DispatcherType.DISPATCHER_GROUP; DispatcherConfiguration config = new DispatcherConfiguration(dispatcherName, dispatcherType, 128, 0); - return new ReactorConfiguration(Arrays.asList(config), dispatcherName, new Properties()); + return new ReactorConfiguration(Arrays.asList(config), dispatcherName, new Properties + ()); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index dd30ba706b4fdda70ce4bf12c3106de8d87166a0..076626737505596f948492366ff422a9e4044277 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -40,7 +40,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.messaging.tcp.reactor.Reactor11TcpClient; +import org.springframework.messaging.tcp.reactor.Reactor2TcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -327,7 +327,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler /** * Configure a TCP client for managing TCP connections to the STOMP broker. - * By default {@link org.springframework.messaging.tcp.reactor.Reactor11TcpClient} is used. + * By default {@link Reactor2TcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -379,7 +379,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (this.tcpClient == null) { StompDecoder decoder = new StompDecoder(); decoder.setHeaderInitializer(getHeaderInitializer()); - Reactor11StompCodec codec = new Reactor11StompCodec(new StompEncoder(), decoder); + Reactor2StompCodec codec = new Reactor2StompCodec(new StompEncoder(), decoder); this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort, codec); } @@ -956,8 +956,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static class StompTcpClientFactory { - public TcpOperations create(String relayHost, int relayPort, Reactor11StompCodec codec) { - return new Reactor11TcpClient(relayHost, relayPort, codec); + public TcpOperations create(String relayHost, int relayPort, Reactor2StompCodec codec) { + return new Reactor2TcpClient(relayHost, relayPort, codec); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java index 9874dfd1e6e41677e097c875764ad5188959cd70..e70b0974cd6481cfcbedf540d2961972626be88a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/AbstractPromiseToListenableFutureAdapter.java @@ -20,15 +20,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import reactor.core.composable.Promise; -import reactor.function.Consumer; - import org.springframework.util.Assert; import org.springframework.util.concurrent.FailureCallback; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; import org.springframework.util.concurrent.SuccessCallback; +import reactor.fn.Consumer; +import reactor.rx.Promise; /** * Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting @@ -55,8 +54,7 @@ abstract class AbstractPromiseToListenableFutureAdapter implements Listena public void accept(S result) { try { registry.success(adapt(result)); - } - catch (Throwable t) { + } catch (Throwable t) { registry.failure(t); } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java index cc4267f9d9ea0d4602eae4adb4bd84ed6e03b79b..d880cdd71d1a5ae4aadf90eb7ff60994220d3c18 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/PassThroughPromiseToListenableFutureAdapter.java @@ -16,7 +16,8 @@ package org.springframework.messaging.tcp.reactor; -import reactor.core.composable.Promise; + +import reactor.rx.Promise; /** * A Promise-to-ListenableFutureAdapter where the source and the target from the Promise and diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java deleted file mode 100644 index a38eb324ff5cc1c6833c0f4240917db58fc03d87..0000000000000000000000000000000000000000 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpClient.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright 2002-2014 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.messaging.tcp.reactor; - -import java.net.InetSocketAddress; -import java.util.Arrays; -import java.util.Properties; - -import reactor.core.Environment; -import reactor.core.composable.Composable; -import reactor.core.composable.Promise; -import reactor.core.composable.Stream; -import reactor.core.composable.spec.Promises; -import reactor.core.configuration.ConfigurationReader; -import reactor.core.configuration.DispatcherConfiguration; -import reactor.core.configuration.ReactorConfiguration; -import reactor.function.Consumer; -import reactor.function.Function; -import reactor.io.Buffer; -import reactor.io.encoding.Codec; -import reactor.net.NetChannel; -import reactor.net.Reconnect; -import reactor.net.netty.tcp.NettyTcpClient; -import reactor.net.tcp.TcpClient; -import reactor.net.tcp.spec.TcpClientSpec; -import reactor.tuple.Tuple; -import reactor.tuple.Tuple2; - -import org.springframework.messaging.Message; -import org.springframework.messaging.tcp.ReconnectStrategy; -import org.springframework.messaging.tcp.TcpConnectionHandler; -import org.springframework.messaging.tcp.TcpOperations; -import org.springframework.util.Assert; -import org.springframework.util.concurrent.ListenableFuture; - - -/** - * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} - * based on the TCP client support of the Reactor project. - * - * @author Rossen Stoyanchev - * @since 4.0 - */ -public class Reactor11TcpClient

implements TcpOperations

{ - - @SuppressWarnings("rawtypes") - public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - - - private final TcpClient, Message

> tcpClient; - - private final Environment environment; - - - /** - * A constructor that creates a {@link reactor.net.netty.tcp.NettyTcpClient} with - * a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which - * network I/O is handled in Netty threads. - * - *

Also see the constructor accepting a pre-configured Reactor - * {@link reactor.net.tcp.TcpClient}. - * - * @param host the host to connect to - * @param port the port to connect to - * @param codec the codec to use for encoding and decoding the TCP stream - */ - public Reactor11TcpClient(String host, int port, Codec, Message

> codec) { - - // Revisit in 1.1: is Environment still required w/ sync dispatcher? - this.environment = new Environment(new SynchronousDispatcherConfigReader()); - - this.tcpClient = new TcpClientSpec, Message

>(REACTOR_TCP_CLIENT_TYPE) - .env(this.environment) - .codec(codec) - .connect(host, port) - .get(); - } - - /** - * A constructor with a pre-configured {@link reactor.net.tcp.TcpClient}. - * - *

NOTE: if the client is configured with a thread-creating - * dispatcher, you are responsible for shutting down the {@link reactor.core.Environment} - * instance with which the client is configured. - * - * @param tcpClient the TcpClient to use - */ - public Reactor11TcpClient(TcpClient, Message

> tcpClient) { - Assert.notNull(tcpClient, "'tcpClient' must not be null"); - this.tcpClient = tcpClient; - this.environment = null; - } - - - @Override - public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { - - Promise, Message

>> promise = this.tcpClient.open(); - composeConnectionHandling(promise, connectionHandler); - - return new AbstractPromiseToListenableFutureAdapter, Message

>, Void>(promise) { - @Override - protected Void adapt(NetChannel, Message

> result) { - return null; - } - }; - } - - @Override - public ListenableFuture connect(final TcpConnectionHandler

connectionHandler, - final ReconnectStrategy reconnectStrategy) { - - Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null"); - - Reconnect reconnect = new Reconnect() { - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); - } - }; - - Stream, Message

>> stream = this.tcpClient.open(reconnect); - composeConnectionHandling(stream, connectionHandler); - - Promise promise = Promises.next(stream).map( - new Function, Message

>, Void>() { - @Override - public Void apply(NetChannel, Message

> ch) { - return null; - } - }); - return new PassThroughPromiseToListenableFutureAdapter(promise); - } - - private void composeConnectionHandling(Composable, Message

>> composable, - final TcpConnectionHandler

connectionHandler) { - - composable - .when(Throwable.class, new Consumer() { - @Override - public void accept(Throwable ex) { - connectionHandler.afterConnectFailure(ex); - } - }) - .consume(new Consumer, Message

>>() { - @Override - public void accept(NetChannel, Message

> connection) { - connection - .when(Throwable.class, new Consumer() { - @Override - public void accept(Throwable t) { - connectionHandler.handleFailure(t); - } - }) - .consume(new Consumer>() { - @Override - public void accept(Message

message) { - connectionHandler.handleMessage(message); - } - }) - .on() - .close(new Runnable() { - @Override - public void run() { - connectionHandler.afterConnectionClosed(); - } - }); - connectionHandler.afterConnected(new Reactor11TcpConnection

(connection)); - } - }); - } - - @Override - public ListenableFuture shutdown() { - try { - Promise promise = this.tcpClient.close(); - return new AbstractPromiseToListenableFutureAdapter(promise) { - @Override - protected Boolean adapt(Boolean result) { - return result; - } - }; - } - finally { - if (this.environment != null) { - this.environment.shutdown(); - } - - } - } - - /** - * A ConfigurationReader that enforces the use of a SynchronousDispatcher. - * - *

The {@link reactor.core.configuration.PropertiesConfigurationReader} used by - * default automatically creates other dispatchers with thread pools that are - * not needed. - */ - private static class SynchronousDispatcherConfigReader implements ConfigurationReader { - - @Override - public ReactorConfiguration read() { - return new ReactorConfiguration(Arrays.asList(), "sync", new Properties()); - } - } - -} \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java new file mode 100644 index 0000000000000000000000000000000000000000..d71d5f1e1d33ed03055555e67a89de3cd17625a8 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java @@ -0,0 +1,269 @@ +/* + * Copyright 2002-2014 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.tcp.reactor; + +import io.netty.channel.nio.NioEventLoopGroup; +import org.springframework.messaging.Message; +import org.springframework.messaging.tcp.ReconnectStrategy; +import org.springframework.messaging.tcp.TcpConnectionHandler; +import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import reactor.core.support.NamedDaemonThreadFactory; +import reactor.fn.BiFunction; +import reactor.fn.Consumer; +import reactor.fn.Function; +import reactor.fn.tuple.Tuple; +import reactor.fn.tuple.Tuple2; +import reactor.io.buffer.Buffer; +import reactor.io.codec.Codec; +import reactor.io.net.ChannelStream; +import reactor.io.net.NetStreams; +import reactor.io.net.Reconnect; +import reactor.io.net.Spec; +import reactor.io.net.impl.netty.NettyClientSocketOptions; +import reactor.io.net.impl.netty.tcp.NettyTcpClient; +import reactor.io.net.tcp.TcpClient; +import reactor.rx.Promise; +import reactor.rx.Stream; +import reactor.rx.Streams; +import reactor.rx.action.Signal; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; + + +/** + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the TCP client support of the Reactor project. + *

+ * This client will wrap N number of clients for N {@link #connect} calls (one client by connection). + * + * @author Rossen Stoyanchev + * @author Stephane Maldini + * @since 4.2 + */ +public class Reactor2TcpClient

implements TcpOperations

{ + + @SuppressWarnings("rawtypes") + public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; + + private final Function, Message

>, + Spec.TcpClientSpec, Message

>> tcpClientSpec; + + private final List, Message

>> activeClients = + new ArrayList, Message

>>(); + + /** + * A constructor that creates a {@link reactor.io.net.Spec.TcpClientSpec} factory with + * a default {@link reactor.core.dispatch.SynchronousDispatcher} as a result of which + * network I/O is handled in Netty threads. Number of Netty threads can be tweaked with + * the {@code reactor.tcp.ioThreadCount} System property. + *

+ * The network I/O threads will be shared amongst the active clients. + *

+ *

+ *

Also see the constructor accepting a ready Reactor + * {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory. + * + * @param host the host to connect to + * @param port the port to connect to + * @param codec the codec to use for encoding and decoding the TCP stream + */ + public Reactor2TcpClient(final String host, final int port, final Codec, Message

> codec) { + + //FIXME Should it be exposed in Spring ? + int ioThreadCount; + try { + ioThreadCount = Integer.parseInt(System.getProperty("reactor.tcp.ioThreadCount")); + } catch (Exception i) { + ioThreadCount = -1; + } + if (ioThreadCount <= 0l) { + ioThreadCount = Runtime.getRuntime().availableProcessors(); + } + + final NioEventLoopGroup eventLoopGroup = + new NioEventLoopGroup(ioThreadCount, new NamedDaemonThreadFactory("reactor-tcp-io")); + + this.tcpClientSpec = new Function, Message

>, + Spec.TcpClientSpec, Message

>>() { + + @Override + public Spec.TcpClientSpec, Message

> apply(Spec.TcpClientSpec, Message

> + messageMessageTcpClientSpec) { + return messageMessageTcpClientSpec + .codec(codec) + //make connect dynamic or use reconnect strategy to LB onto cluster + .connect(host, port) + .options(new NettyClientSocketOptions().eventLoopGroup(eventLoopGroup)); + } + }; + } + + /** + * A constructor with a pre-configured {@link reactor.io.net.Spec.TcpClientSpec} {@link Function} factory. + * This might be used to add SSL or specific network parameters to the generated client configuration. + *

+ *

NOTE: if the client is configured with a thread-creating + * dispatcher, you are responsible for cleaning them, e.g. using {@link reactor.core.Dispatcher#shutdown}. + * + * @param tcpClientSpecFactory the TcpClientSpec {@link Function} to use for each client creation. + */ + public Reactor2TcpClient(Function, Message

>, + Spec.TcpClientSpec, Message

>> tcpClientSpecFactory) { + Assert.notNull(tcpClientSpecFactory, "'tcpClientClientFactory' must not be null"); + this.tcpClientSpec = tcpClientSpecFactory; + } + + + @Override + public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { + + //create a new client + TcpClient, Message

> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec); + + //attach connection handler + composeConnectionHandling(tcpClient, connectionHandler); + + //open connection + Promise promise = tcpClient.open(); + + //adapt to ListenableFuture + return new AbstractPromiseToListenableFutureAdapter(promise) { + @Override + protected Void adapt(Boolean result) { + return null; + } + }; + } + + @Override + public ListenableFuture connect(final TcpConnectionHandler

connectionHandler, + final ReconnectStrategy reconnectStrategy) { + + Assert.notNull(reconnectStrategy, "ReconnectStrategy must not be null"); + + Reconnect reconnect = new Reconnect() { + @Override + public Tuple2 reconnect(InetSocketAddress address, int attempt) { + return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); + } + }; + + //create a new client + TcpClient, Message

> tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, tcpClientSpec); + + //attach connection handler + composeConnectionHandling(tcpClient, connectionHandler); + + //open connection + Stream stream = tcpClient.open(reconnect); + + //adapt to ListenableFuture with the next available connection + Promise promise = stream.next().map( + new Function() { + @Override + public Void apply(Boolean ch) { + return null; + } + }); + + return new PassThroughPromiseToListenableFutureAdapter(promise); + } + + private void composeConnectionHandling(final TcpClient, Message

> tcpClient, + final TcpConnectionHandler

connectionHandler) { + + synchronized (activeClients){ + activeClients.add(tcpClient); + } + + tcpClient + .finallyDo(new Consumer,Message

>>>() { + @Override + public void accept(Signal,Message

>> signal) { + synchronized (activeClients) { + activeClients.remove(tcpClient); + } + if(signal.isOnError()) { + connectionHandler.afterConnectFailure(signal.getThrowable()); + } + } + }) + .log("reactor.client") + .consume(new Consumer, Message

>>() { + @Override + public void accept(ChannelStream, Message

> connection) { + connection + .log("reactor.connection") + .finallyDo(new Consumer>>() { + @Override + public void accept(Signal> signal) { + if (signal.isOnError()) { + connectionHandler.handleFailure(signal.getThrowable()); + } else if (signal.isOnComplete()) { + connectionHandler.afterConnectionClosed(); + } + } + }) + .consume(new Consumer>() { + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }); + + connectionHandler.afterConnected(new Reactor2TcpConnection

(connection)); + } + }); + } + + @Override + public ListenableFuture shutdown() { + final List, Message

>> clients; + + synchronized (activeClients){ + clients = new ArrayList, Message

>>(activeClients); + //should be cleared individually in tcpClient#finallyDo() + //activeClients.clear(); + } + + Promise promise = Streams.from(clients) + .flatMap(new Function, Message

>, Promise>() { + @Override + public Promise apply(TcpClient, Message

> client) { + return client.close(); + } + }) + .reduce(new BiFunction() { + @Override + public Boolean apply(Boolean prev, Boolean next) { + return prev && next; + } + }) + .next(); + + return new AbstractPromiseToListenableFutureAdapter(promise) { + @Override + protected Boolean adapt(Boolean result) { + return result; + } + }; + } +} \ No newline at end of file diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java similarity index 58% rename from spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java rename to spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java index e9aad6cef1b2f577a99760ea9cd3f47e59460187..2f9df2b432913784205dee19fc0662080a3386f6 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor11TcpConnection.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpConnection.java @@ -16,12 +16,20 @@ package org.springframework.messaging.tcp.reactor; -import reactor.core.composable.Promise; -import reactor.net.NetChannel; - +import org.reactivestreams.Publisher; import org.springframework.messaging.Message; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.util.concurrent.ListenableFuture; +import reactor.fn.Consumer; +import reactor.fn.Function; +import reactor.fn.Functions; +import reactor.io.net.ChannelStream; +import reactor.rx.Promise; +import reactor.rx.Promises; +import reactor.rx.Stream; +import reactor.rx.broadcast.Broadcaster; + +import java.lang.reflect.Constructor; /** * An implementation of {@link org.springframework.messaging.tcp.TcpConnection} @@ -32,35 +40,40 @@ import org.springframework.util.concurrent.ListenableFuture; * @param

the payload type of Spring Message's read from and written to * the TCP stream */ -public class Reactor11TcpConnection

implements TcpConnection

{ +public class Reactor2TcpConnection

implements TcpConnection

{ - private final NetChannel, Message

> channel; + private final ChannelStream, Message

> channel; + private final Broadcaster> sink; - public Reactor11TcpConnection(NetChannel, Message

> connection) { + public Reactor2TcpConnection(ChannelStream, Message

> connection) { this.channel = connection; + this.sink = Broadcaster.create(); + + channel.sink(sink); } @Override public ListenableFuture send(Message

message) { - Promise promise = this.channel.send(message); - return new PassThroughPromiseToListenableFutureAdapter(promise); + sink.onNext(message); + //FIXME need to align Reactor with Reactive IPC to have publish/confirm receipt + return new PassThroughPromiseToListenableFutureAdapter(Promises.success(null)); } @Override public void onReadInactivity(Runnable runnable, long inactivityDuration) { - this.channel.on().readIdle(inactivityDuration, runnable); + this.channel.on().readIdle(inactivityDuration, Functions.consumer(runnable)); } @Override public void onWriteInactivity(Runnable runnable, long inactivityDuration) { - this.channel.on().writeIdle(inactivityDuration, runnable); + this.channel.on().writeIdle(inactivityDuration, Functions.consumer(runnable)); } @Override public void close() { - this.channel.close(); + sink.onComplete(); } } diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClientTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java similarity index 95% rename from spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClientTests.java rename to spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java index 97f453287590b2d1976ae690e0804ccf344da68b..9d8a82e72d2879d6854913780e52e4c117b6959f 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor11TcpStompClientTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/Reactor2TcpStompClientTests.java @@ -42,20 +42,20 @@ import org.springframework.util.SocketUtils; import org.springframework.util.concurrent.ListenableFuture; /** - * Integration tests for {@link Reactor11TcpStompClient}. + * Integration tests for {@link Reactor2TcpStompClient}. * * @author Rossen Stoyanchev */ -public class Reactor11TcpStompClientTests { +public class Reactor2TcpStompClientTests { - private static final Log logger = LogFactory.getLog(Reactor11TcpStompClientTests.class); + private static final Log logger = LogFactory.getLog(Reactor2TcpStompClientTests.class); @Rule public final TestName testName = new TestName(); private BrokerService activeMQBroker; - private Reactor11TcpStompClient client; + private Reactor2TcpStompClient client; @Before @@ -77,7 +77,7 @@ public class Reactor11TcpStompClientTests { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.afterPropertiesSet(); - this.client = new Reactor11TcpStompClient("127.0.0.1", port); + this.client = new Reactor2TcpStompClient("127.0.0.1", port); this.client.setMessageConverter(new StringMessageConverter()); this.client.setTaskScheduler(taskScheduler); } @@ -86,8 +86,7 @@ public class Reactor11TcpStompClientTests { public void tearDown() throws Exception { try { this.client.shutdown(); - } - catch (Throwable ex) { + } catch (Throwable ex) { logger.error("Failed to shut client", ex); } final CountDownLatch latch = new CountDownLatch(1); diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java index 1275afa801eda9a5e2422687ce55d52e23a9910a..dce4e85f646e7e1614fe285c1d62bb7660b25b47 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompCodecTests.java @@ -21,19 +21,19 @@ import java.util.ArrayList; import java.util.List; import org.junit.Test; -import reactor.function.Consumer; -import reactor.function.Function; -import reactor.io.Buffer; import org.springframework.messaging.Message; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.InvalidMimeTypeException; +import reactor.fn.Consumer; +import reactor.fn.Function; +import reactor.io.buffer.Buffer; import static org.junit.Assert.*; /** - * Test fixture for {@link Reactor11StompCodec}. + * Test fixture for {@link Reactor2StompCodec}. * * @author Andy Wilkinson */ @@ -41,7 +41,7 @@ public class StompCodecTests { private final ArgumentCapturingConsumer> consumer = new ArgumentCapturingConsumer>(); - private final Function> decoder = new Reactor11StompCodec().decoder(consumer); + private final Function> decoder = new Reactor2StompCodec().decoder(consumer); @Test public void decodeFrameWithCrLfEols() { @@ -176,7 +176,7 @@ public class StompCodecTests { Buffer buffer = Buffer.wrap(frame1 + frame2); final List> messages = new ArrayList>(); - new Reactor11StompCodec().decoder(new Consumer>() { + new Reactor2StompCodec().decoder(new Consumer>() { @Override public void accept(Message message) { messages.add(message); @@ -234,7 +234,7 @@ public class StompCodecTests { Buffer buffer = Buffer.wrap(frame); final List> messages = new ArrayList>(); - new Reactor11StompCodec().decoder(new Consumer>() { + new Reactor2StompCodec().decoder(new Consumer>() { @Override public void accept(Message message) { messages.add(message); @@ -251,7 +251,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - assertEquals("DISCONNECT\n\n\0", new Reactor11StompCodec().encoder().apply(frame).asString()); + assertEquals("DISCONNECT\n\n\0", new Reactor2StompCodec().encoder().apply(frame).asString()); } @Test @@ -262,7 +262,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); - String frameString = new Reactor11StompCodec().encoder().apply(frame).asString(); + String frameString = new Reactor2StompCodec().encoder().apply(frame).asString(); assertTrue(frameString.equals("CONNECT\naccept-version:1.2\nhost:github.org\n\n\0") || frameString.equals("CONNECT\nhost:github.org\naccept-version:1.2\n\n\0")); @@ -276,7 +276,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders()); assertEquals("DISCONNECT\na\\c\\r\\n\\\\b:alpha\\cbravo\\r\\n\\\\\n\n\0", - new Reactor11StompCodec().encoder().apply(frame).asString()); + new Reactor2StompCodec().encoder().apply(frame).asString()); } @Test @@ -287,7 +287,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\na:alpha\ncontent-length:12\n\nMessage body\0", - new Reactor11StompCodec().encoder().apply(frame).asString()); + new Reactor2StompCodec().encoder().apply(frame).asString()); } @Test @@ -298,7 +298,7 @@ public class StompCodecTests { Message frame = MessageBuilder.createMessage("Message body".getBytes(), headers.getMessageHeaders()); assertEquals("SEND\ncontent-length:12\n\nMessage body\0", - new Reactor11StompCodec().encoder().apply(frame).asString()); + new Reactor2StompCodec().encoder().apply(frame).asString()); } private void assertIncompleteDecode(String partialFrame) { diff --git a/spring-messaging/src/test/resources/log4j.properties b/spring-messaging/src/test/resources/log4j.properties index 06597aa7b588dd0a3101b477b91e6d6511168d53..d8c61d7307ee0d1b9bff65a4893ceac2d72a9555 100644 --- a/spring-messaging/src/test/resources/log4j.properties +++ b/spring-messaging/src/test/resources/log4j.properties @@ -4,10 +4,10 @@ log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} [%c] - %m%n log4j.rootCategory=WARN, console log4j.logger.org.springframework.messaging=DEBUG -log4j.logger.org.apache.activemq=TRACE +log4j.logger.org.apache.activemq=INFO # Enable TRACE level to chase integration test issues on CI servers log4j.logger.org.springframework.messaging.simp.stomp=TRACE -log4j.logger.reactor.net=TRACE -log4j.logger.io.netty=TRACE +log4j.logger.reactor=DEBUG +log4j.logger.io.netty=INFO