From 2e13bf8b817d57b976178a5d8682fbb6597ec351 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Sun, 16 Mar 2014 18:02:27 -0400 Subject: [PATCH] Refine Reactor-based TCP client implementation Configure explicitly use of SynchronousDispatcher instead of the one used otherwise by default (RingBufferDispatcher). As a result TCP optations are now scoped to Netty's threads. Remove Environment field. It is no longer required to shut it down since we're now using SynchronousDispatcher by default. Replace connection.in() with connection.consume() when composing connection handling. The former creates a Stream for further composing, e.g. via map(), filter() but all we need is to read a message. Provide additional constructor that aceepts a pre-configured Reactor TcpClient instance. Issue: SPR-11531 --- .../stomp/StompBrokerRelayMessageHandler.java | 30 ++++---- .../tcp/reactor/ReactorNettyTcpClient.java | 68 +++++++++++++------ .../broker/BrokerMessageHandlerTests.java | 9 +-- 3 files changed, 63 insertions(+), 44 deletions(-) diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index e5475cde2e..ca2fe69d3a 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -32,6 +32,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -67,10 +68,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static final byte[] EMPTY_PAYLOAD = new byte[0]; - private static final Message HEARTBEAT_MESSAGE; - + // STOMP recommends error of margin for receiving heartbeats private static final long HEARTBEAT_MULTIPLIER = 3; + private static final Message HEARTBEAT_MESSAGE; + static { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT); HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build(); @@ -299,8 +301,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Configure a TCP client for managing TCP connections to the STOMP broker. By default - * {@link org.springframework.messaging.simp.stomp.StompReactorNettyTcpClient} is used. + * Configure a TCP client for managing TCP connections to the STOMP broker. + * By default {@link org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -323,7 +325,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.brokerChannel.subscribe(this); if (this.tcpClient == null) { - this.tcpClient = new StompReactorNettyTcpClient(this.relayHost, this.relayPort); + this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort); } if (logger.isDebugEnabled()) { @@ -351,15 +353,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.clientInboundChannel.unsubscribe(this); this.brokerChannel.unsubscribe(this); - for (StompConnectionHandler handler : this.connectionHandlers.values()) { - try { - handler.clearConnection(); - } - catch (Throwable t) { - logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage()); - } - } - try { this.tcpClient.shutdown(); } @@ -523,7 +516,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); headers.setSessionId(this.sessionId); headers.setMessage(errorText); - Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); + Message errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); sendMessageToClient(errorMessage); } } @@ -754,4 +747,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class StompTcpClientFactory { + + public TcpOperations create(String relayHost, int relayPort) { + return new ReactorNettyTcpClient(relayHost, relayPort, new StompCodec()); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 3346e57388..59f320334d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,30 +45,59 @@ import reactor.tcp.spec.TcpClientSpec; import reactor.tuple.Tuple; import reactor.tuple.Tuple2; + /** - * A Reactor/Netty implementation of {@link org.springframework.messaging.tcp.TcpOperations}. + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the Reactor project. * * @author Rossen Stoyanchev * @since 4.0 */ public class ReactorNettyTcpClient

implements TcpOperations

{ - private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); + public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private Environment environment; + + private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); private TcpClient, Message

> tcpClient; + /** + * A constructor that creates a {@link reactor.tcp.netty.NettyTcpClient} with + * a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which + * network I/O is handled in Netty threads. + * + *

Also see the constructor accepting a pre-configured Reactor + * {@link reactor.tcp.TcpClient}. + * + * @param host the host to connect to + * @param port the port to connect to + * @param codec the codec to use for encoding and decoding the TCP stream + */ public ReactorNettyTcpClient(String host, int port, Codec, Message

> codec) { - this.environment = new Environment(); - this.tcpClient = new TcpClientSpec, Message

>(NettyTcpClient.class) - .env(this.environment) + this.tcpClient = new TcpClientSpec, Message

>(REACTOR_TCP_CLIENT_TYPE) + .env(new Environment()) .codec(codec) .connect(host, port) + .synchronousDispatcher() .get(); } + /** + * A constructor with a pre-configured {@link reactor.tcp.TcpClient}. + * + *

NOTE: if the client is configured with a thread-creating + * dispatcher, you are responsible for shutting down the {@link reactor.core.Environment} + * instance with which the client is configured. + * + * @param tcpClient the TcpClient to use + */ + public ReactorNettyTcpClient(TcpClient, Message

> tcpClient) { + Assert.notNull(tcpClient, "'tcpClient' must not be null"); + this.tcpClient = tcpClient; + } + @Override public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { @@ -121,7 +150,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connectionHandler.afterConnectionClosed(); } }); - connection.in().consume(new Consumer>() { + connection.consume(new Consumer>() { @Override public void accept(Message

message) { connectionHandler.handleMessage(message); @@ -130,7 +159,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connection.when(Throwable.class, new Consumer() { @Override public void accept(Throwable t) { - logger.error("Exception on connection " + connectionHandler, t); + logger.error("Exception on connection " + connectionHandler, t); } }); connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); @@ -161,18 +190,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public ListenableFuture shutdown() { - try { - Promise promise = this.tcpClient.close(); - return new AbstractPromiseToListenableFutureAdapter(promise) { - @Override - protected Void adapt(Void result) { - return result; - } - }; - } - finally { - this.environment.shutdown(); - } + Promise promise = this.tcpClient.close(); + return new AbstractPromiseToListenableFutureAdapter(promise) { + @Override + protected Void adapt(Void result) { + return result; + } + }; } -} +} \ No newline at end of file diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java index e39effeb97..2f4660dfb6 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java @@ -68,10 +68,10 @@ public class BrokerMessageHandlerTests { } @Test - public void stopShouldPublishBrokerAvailabilityEvent() { + public void startAndStopShouldNotPublishBrokerAvailabilityEvents() { this.handler.start(); this.handler.stop(); - assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + assertEquals(Collections.emptyList(), this.handler.availabilityEvents); } @Test @@ -136,11 +136,6 @@ public class BrokerMessageHandlerTests { setApplicationEventPublisher(this); } - @Override - protected void startInternal() { - publishBrokerAvailableEvent(); - } - @Override protected void handleMessageInternal(Message message) { this.messages.add(message); -- GitLab