diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index e5475cde2e12713829dfd1a0f7984b54dc600f96..ca2fe69d3a2c8f0f5cd088e4e65e3b9dae40e663 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -32,6 +32,7 @@ import org.springframework.messaging.tcp.FixedIntervalReconnectStrategy; import org.springframework.messaging.tcp.TcpConnection; import org.springframework.messaging.tcp.TcpConnectionHandler; import org.springframework.messaging.tcp.TcpOperations; +import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient; import org.springframework.util.Assert; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; @@ -67,10 +68,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private static final byte[] EMPTY_PAYLOAD = new byte[0]; - private static final Message HEARTBEAT_MESSAGE; - + // STOMP recommends error of margin for receiving heartbeats private static final long HEARTBEAT_MULTIPLIER = 3; + private static final Message HEARTBEAT_MESSAGE; + static { SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.HEARTBEAT); HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).setHeaders(headers).build(); @@ -299,8 +301,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Configure a TCP client for managing TCP connections to the STOMP broker. By default - * {@link org.springframework.messaging.simp.stomp.StompReactorNettyTcpClient} is used. + * Configure a TCP client for managing TCP connections to the STOMP broker. + * By default {@link org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient} is used. */ public void setTcpClient(TcpOperations tcpClient) { this.tcpClient = tcpClient; @@ -323,7 +325,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.brokerChannel.subscribe(this); if (this.tcpClient == null) { - this.tcpClient = new StompReactorNettyTcpClient(this.relayHost, this.relayPort); + this.tcpClient = new StompTcpClientFactory().create(this.relayHost, this.relayPort); } if (logger.isDebugEnabled()) { @@ -351,15 +353,6 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler this.clientInboundChannel.unsubscribe(this); this.brokerChannel.unsubscribe(this); - for (StompConnectionHandler handler : this.connectionHandlers.values()) { - try { - handler.clearConnection(); - } - catch (Throwable t) { - logger.error("Failed to close connection in session " + handler.getSessionId() + ": " + t.getMessage()); - } - } - try { this.tcpClient.shutdown(); } @@ -523,7 +516,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); headers.setSessionId(this.sessionId); headers.setMessage(errorText); - Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); + Message errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); sendMessageToClient(errorMessage); } } @@ -754,4 +747,11 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } } + private static class StompTcpClientFactory { + + public TcpOperations create(String relayHost, int relayPort) { + return new ReactorNettyTcpClient(relayHost, relayPort, new StompCodec()); + } + } + } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java index 3346e57388d07ac26304e1a5fcc923e1e2e1d046..59f320334d3a98951dca8bb8a270eccb6a018890 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2013 the original author or authors. + * Copyright 2002-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,30 +45,59 @@ import reactor.tcp.spec.TcpClientSpec; import reactor.tuple.Tuple; import reactor.tuple.Tuple2; + /** - * A Reactor/Netty implementation of {@link org.springframework.messaging.tcp.TcpOperations}. + * An implementation of {@link org.springframework.messaging.tcp.TcpOperations} + * based on the Reactor project. * * @author Rossen Stoyanchev * @since 4.0 */ public class ReactorNettyTcpClient

implements TcpOperations

{ - private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); + public static final Class REACTOR_TCP_CLIENT_TYPE = NettyTcpClient.class; - private Environment environment; + + private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class); private TcpClient, Message

> tcpClient; + /** + * A constructor that creates a {@link reactor.tcp.netty.NettyTcpClient} with + * a {@link reactor.event.dispatch.SynchronousDispatcher} as a result of which + * network I/O is handled in Netty threads. + * + *

Also see the constructor accepting a pre-configured Reactor + * {@link reactor.tcp.TcpClient}. + * + * @param host the host to connect to + * @param port the port to connect to + * @param codec the codec to use for encoding and decoding the TCP stream + */ public ReactorNettyTcpClient(String host, int port, Codec, Message

> codec) { - this.environment = new Environment(); - this.tcpClient = new TcpClientSpec, Message

>(NettyTcpClient.class) - .env(this.environment) + this.tcpClient = new TcpClientSpec, Message

>(REACTOR_TCP_CLIENT_TYPE) + .env(new Environment()) .codec(codec) .connect(host, port) + .synchronousDispatcher() .get(); } + /** + * A constructor with a pre-configured {@link reactor.tcp.TcpClient}. + * + *

NOTE: if the client is configured with a thread-creating + * dispatcher, you are responsible for shutting down the {@link reactor.core.Environment} + * instance with which the client is configured. + * + * @param tcpClient the TcpClient to use + */ + public ReactorNettyTcpClient(TcpClient, Message

> tcpClient) { + Assert.notNull(tcpClient, "'tcpClient' must not be null"); + this.tcpClient = tcpClient; + } + @Override public ListenableFuture connect(TcpConnectionHandler

connectionHandler) { @@ -121,7 +150,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connectionHandler.afterConnectionClosed(); } }); - connection.in().consume(new Consumer>() { + connection.consume(new Consumer>() { @Override public void accept(Message

message) { connectionHandler.handleMessage(message); @@ -130,7 +159,7 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ connection.when(Throwable.class, new Consumer() { @Override public void accept(Throwable t) { - logger.error("Exception on connection " + connectionHandler, t); + logger.error("Exception on connection " + connectionHandler, t); } }); connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); @@ -161,18 +190,13 @@ public class ReactorNettyTcpClient

implements TcpOperations

{ @Override public ListenableFuture shutdown() { - try { - Promise promise = this.tcpClient.close(); - return new AbstractPromiseToListenableFutureAdapter(promise) { - @Override - protected Void adapt(Void result) { - return result; - } - }; - } - finally { - this.environment.shutdown(); - } + Promise promise = this.tcpClient.close(); + return new AbstractPromiseToListenableFutureAdapter(promise) { + @Override + protected Void adapt(Void result) { + return result; + } + }; } -} +} \ No newline at end of file diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java index e39effeb970d93facd74213ea48665f62701b50f..2f4660dfb623f49e81ef5517e7e3d5990ffde643 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/broker/BrokerMessageHandlerTests.java @@ -68,10 +68,10 @@ public class BrokerMessageHandlerTests { } @Test - public void stopShouldPublishBrokerAvailabilityEvent() { + public void startAndStopShouldNotPublishBrokerAvailabilityEvents() { this.handler.start(); this.handler.stop(); - assertEquals(Arrays.asList(true, false), this.handler.availabilityEvents); + assertEquals(Collections.emptyList(), this.handler.availabilityEvents); } @Test @@ -136,11 +136,6 @@ public class BrokerMessageHandlerTests { setApplicationEventPublisher(this); } - @Override - protected void startInternal() { - publishBrokerAvailableEvent(); - } - @Override protected void handleMessageInternal(Message message) { this.messages.add(message);