From 29934d7c0249e54897cb06c1d779dcaec4d6f4ee Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Thu, 17 Oct 2013 22:12:06 -0400 Subject: [PATCH] Add TCP abstractions to spring-messaging This change adds abstractions for opening and managing TCP connections primarily for use with the STOMP broker support. As one immediate benefit the change makes the StompBrokerRelayMessageHandler more easy to test. --- .../messaging/MessageDeliveryException.java | 4 + .../stomp/StompBrokerRelayMessageHandler.java | 509 ++++++++---------- .../tcp/FixedIntervalReconnectStrategy.java | 43 ++ .../tcp/PromiseToListenableFutureAdapter.java | 110 ++++ .../support/tcp/ReactorNettyTcpClient.java | 127 +++++ .../support/tcp/ReactorTcpConnection.java | 134 +++++ .../support/tcp/ReconnectStrategy.java | 36 ++ .../messaging/support/tcp/TcpConnection.java | 58 ++ .../support/tcp/TcpConnectionHandler.java | 55 ++ .../messaging/support/tcp/TcpOperations.java | 51 ++ .../messaging/support/tcp/package-info.java | 9 + ...erRelayMessageHandlerIntegrationTests.java | 15 +- 12 files changed, 845 insertions(+), 306 deletions(-) create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java create mode 100644 spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java index d92d93de35..e208fabe6e 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java @@ -37,6 +37,10 @@ public class MessageDeliveryException extends MessagingException { super(undeliveredMessage, description); } + public MessageDeliveryException(Message message, Throwable cause) { + super(message, cause); + } + public MessageDeliveryException(Message undeliveredMessage, String description, Throwable cause) { super(undeliveredMessage, description, cause); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 5b6447e71a..cc0f1be16d 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,11 +16,10 @@ package org.springframework.messaging.simp.stomp; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -29,21 +28,15 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.tcp.FixedIntervalReconnectStrategy; +import org.springframework.messaging.support.tcp.ReactorNettyTcpClient; +import org.springframework.messaging.support.tcp.TcpConnection; +import org.springframework.messaging.support.tcp.TcpConnectionHandler; +import org.springframework.messaging.support.tcp.TcpOperations; import org.springframework.util.Assert; - -import reactor.core.Environment; -import reactor.core.composable.Composable; -import reactor.core.composable.Deferred; -import reactor.core.composable.Promise; -import reactor.core.composable.spec.DeferredPromiseSpec; -import reactor.function.Consumer; -import reactor.tcp.Reconnect; -import reactor.tcp.TcpClient; -import reactor.tcp.TcpConnection; -import reactor.tcp.netty.NettyTcpClient; -import reactor.tcp.spec.TcpClientSpec; -import reactor.tuple.Tuple; -import reactor.tuple.Tuple2; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureTask; /** @@ -61,7 +54,7 @@ import reactor.tuple.Tuple2; * opposed to from a client). Such messages are recognized because they are not associated * with any client and therefore do not have a session id header. The "system" connection * is effectively shared and cannot be used to receive messages. Several properties are - * provided to configure the "system" session including the the + * provided to configure the "system" connection including the the * {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode}, * heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and * {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals. @@ -72,6 +65,13 @@ import reactor.tuple.Tuple2; */ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { + private static final byte[] EMPTY_PAYLOAD = new byte[0]; + + private static final Message HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).build(); + + private static final long HEARTBEAT_MULTIPLIER = 3; + + private final MessageChannel messageChannel; private String relayHost = "127.0.0.1"; @@ -88,11 +88,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private String virtualHost; - private Environment environment; + private TcpOperations tcpClient; - private TcpClient, Message> tcpClient; - - private final Map relaySessions = new ConcurrentHashMap(); + private final Map connectionHandlers = + new ConcurrentHashMap(); /** @@ -137,20 +136,31 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the interval, in milliseconds, at which the "system" relay session will, in the + * Configure the TCP client to for managing STOMP over TCP connections to the message + * broker. This is an optional property that can be used to replace the default + * implementation used for example for testing purposes. + *

+ * By default an instance of {@link ReactorNettyTcpClient} is used. + */ + public void setTcpClient(TcpOperations tcpClient) { + this.tcpClient = tcpClient; + } + + /** + * Set the interval, in milliseconds, at which the "system" connection will, in the * absence of any other data being sent, send a heartbeat to the STOMP broker. A value * of zero will prevent heartbeats from being sent to the broker. *

* The default value is 10000. *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; } /** - * @return The interval, in milliseconds, at which the "system" relay session will + * @return The interval, in milliseconds, at which the "system" connection will * send heartbeats to the STOMP broker. */ public long getSystemHeartbeatSendInterval() { @@ -158,21 +168,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the maximum interval, in milliseconds, at which the "system" relay session + * Set the maximum interval, in milliseconds, at which the "system" connection * expects, in the absence of any other data, to receive a heartbeat from the STOMP - * broker. A value of zero will configure the relay session to expect not to receive + * broker. A value of zero will configure the connection to expect not to receive * heartbeats from the broker. *

* The default value is 10000. *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; } /** - * @return The interval, in milliseconds, at which the "system" relay session expects + * @return The interval, in milliseconds, at which the "system" connection expects * to receive heartbeats from the STOMP broker. */ public long getSystemHeartbeatReceiveInterval() { @@ -180,10 +190,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the login for the "system" relay session used to send messages to the STOMP + * Set the login for the "system" connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemLogin(String systemLogin) { Assert.hasText(systemLogin, "systemLogin must not be empty"); @@ -191,24 +201,24 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the login used by the "system" relay session to connect to the STOMP broker + * @return the login used by the "system" connection to connect to the STOMP broker */ public String getSystemLogin() { return this.systemLogin; } /** - * Set the passcode for the "system" relay session used to send messages to the STOMP + * Set the passcode for the "system" connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemPasscode(String systemPasscode) { this.systemPasscode = systemPasscode; } /** - * @return the passcode used by the "system" relay session to connect to the STOMP broker + * @return the passcode used by the "system" connection to connect to the STOMP broker */ public String getSystemPasscode() { return this.systemPasscode; @@ -237,12 +247,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void startInternal() { - this.environment = new Environment(); - this.tcpClient = new TcpClientSpec, Message>(NettyTcpClient.class) - .env(this.environment) - .codec(new StompCodec()) - .connect(this.relayHost, this.relayPort) - .get(); + + this.tcpClient = new ReactorNettyTcpClient(this.relayHost, this.relayPort, new StompCodec()); if (logger.isDebugEnabled()) { logger.debug("Initializing \"system\" TCP connection"); @@ -254,30 +260,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler headers.setPasscode(this.systemPasscode); headers.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval); headers.setHost(getVirtualHost()); - Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); - SystemStompRelaySession session = new SystemStompRelaySession(); - session.connect(message); + SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers); + this.connectionHandlers.put(handler.getSessionId(), handler); - this.relaySessions.put(session.getId(), session); + this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); } @Override protected void stopInternal() { - for (StompRelaySession session: this.relaySessions.values()) { - session.disconnect(); - } - try { - this.tcpClient.close().await(); - } - catch (Throwable t) { - logger.error("Failed to close reactor TCP client", t); + for (StompConnectionHandler handler : this.connectionHandlers.values()) { + try { + handler.resetTcpConnection(); + } + catch (Throwable t) { + logger.error("Failed to close STOMP connection " + t.getMessage()); + } } try { - this.environment.shutdown(); + this.tcpClient.shutdown(); } catch (Throwable t) { - logger.error("Failed to shut down reactor Environment", t); + logger.error("Error while shutting down TCP client", t); } } @@ -291,7 +295,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SimpMessageType messageType = headers.getMessageType(); if (SimpMessageType.MESSAGE.equals(messageType)) { - sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId; + sessionId = (sessionId == null) ? SystemStompConnectionHandler.SESSION_ID : sessionId; headers.setSessionId(sessionId); command = headers.updateStompCommandAsClientMessage(); message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); @@ -309,138 +313,120 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (SimpMessageType.CONNECT.equals(messageType)) { if (getVirtualHost() != null) { headers.setHost(getVirtualHost()); - message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); } - StompRelaySession session = new StompRelaySession(sessionId); - session.connect(message); - this.relaySessions.put(sessionId, session); + StompConnectionHandler handler = new StompConnectionHandler(sessionId, headers); + this.connectionHandlers.put(sessionId, handler); + this.tcpClient.connect(handler); } else if (SimpMessageType.DISCONNECT.equals(messageType)) { - StompRelaySession session = this.relaySessions.remove(sessionId); - if (session == null) { + StompConnectionHandler handler = removeConnectionHandler(sessionId); + if (handler == null) { if (logger.isTraceEnabled()) { - logger.trace("Session already removed, sessionId=" + sessionId); + logger.trace("Connection already removed for sessionId=" + sessionId); } return; } - session.forward(message); + handler.forward(message); } else { - StompRelaySession session = this.relaySessions.get(sessionId); - if (session == null) { - logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); + StompConnectionHandler handler = this.connectionHandlers.get(sessionId); + if (handler == null) { + logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message); return; } - session.forward(message); + handler.forward(message); } } + private StompConnectionHandler removeConnectionHandler(String sessionId) { + return SystemStompConnectionHandler.SESSION_ID.equals(sessionId) + ? null : this.connectionHandlers.remove(sessionId); + } - private class StompRelaySession { - private static final long HEARTBEAT_MULTIPLIER = 3; + private class StompConnectionHandler implements TcpConnectionHandler { private final String sessionId; private final boolean isRemoteClientSession; - private final long reconnectInterval; - - private volatile StompConnection stompConnection = new StompConnection(); + private final StompHeaderAccessor connectHeaders; - private volatile StompHeaderAccessor connectHeaders; + private volatile TcpConnection tcpConnection; - private volatile StompHeaderAccessor connectedHeaders; + private volatile boolean isStompConnected; - private StompRelaySession(String sessionId) { - this(sessionId, true, 0); + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { + this(sessionId, connectHeaders, true); } - private StompRelaySession(String sessionId, boolean isRemoteClientSession, long reconnectInterval) { + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, + boolean isRemoteClientSession) { + Assert.notNull(sessionId, "sessionId is required"); + Assert.notNull(connectHeaders, "connectHeaders is required"); + this.sessionId = sessionId; + this.connectHeaders = connectHeaders; this.isRemoteClientSession = isRemoteClientSession; - this.reconnectInterval = reconnectInterval; } - - public String getId() { + public String getSessionId() { return this.sessionId; } - public void connect(final Message connectMessage) { - - Assert.notNull(connectMessage, "connectMessage is required"); - this.connectHeaders = StompHeaderAccessor.wrap(connectMessage); - - Composable, Message>> promise; - if (this.reconnectInterval > 0) { - promise = tcpClient.open(new Reconnect() { - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, 5000L); - } - }); - } - else { - promise = tcpClient.open(); - } - - promise.consume(new Consumer, Message>>() { - @Override - public void accept(TcpConnection, Message> connection) { - handleConnectionReady(connection, connectMessage); - } - }); - promise.when(Throwable.class, new Consumer() { - @Override - public void accept(Throwable ex) { - relaySessions.remove(sessionId); - handleTcpClientFailure("Failed to connect to message broker", ex); - } - }); + @Override + public void afterConnected(TcpConnection connection) { + this.tcpConnection = connection; + connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build()); } - public void disconnect() { - this.stompConnection.setDisconnected(); + @Override + public void afterConnectFailure(Throwable ex) { + handleTcpConnectionFailure("Failed to connect to message broker", ex); } - protected void handleConnectionReady( - TcpConnection, Message> tcpConn, final Message connectMessage) { + /** + * Invoked when any TCP connectivity issue is detected, i.e. failure to establish + * the TCP connection, failure to send a message, missed heartbeat. + */ + protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { + if (logger.isErrorEnabled()) { + logger.error(errorMessage + ", sessionId=" + this.sessionId, ex); + } + resetTcpConnection(); + sendStompErrorToClient(errorMessage); + } - this.stompConnection.setTcpConnection(tcpConn); - tcpConn.on().close(new Runnable() { - @Override - public void run() { - connectionClosed(); - } - }); - tcpConn.in().consume(new Consumer>() { - @Override - public void accept(Message message) { - readStompFrame(message); + private void sendStompErrorToClient(String errorText) { + if (this.isRemoteClientSession) { + StompConnectionHandler removed = removeConnectionHandler(this.sessionId); + if (removed != null) { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); + headers.setSessionId(this.sessionId); + headers.setMessage(errorText); + Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); + sendMessageToClient(errorMessage); } - }); - forwardInternal(connectMessage, tcpConn); + } } - protected void connectionClosed() { - relaySessions.remove(this.sessionId); - if (this.stompConnection.isReady()) { - sendError("Lost connection to the broker"); + protected void sendMessageToClient(Message message) { + if (this.isRemoteClientSession) { + StompBrokerRelayMessageHandler.this.messageChannel.send(message); } } - private void readStompFrame(Message message) { + @Override + public void handleMessage(Message message) { if (logger.isTraceEnabled()) { - logger.trace("Reading message for sessionId=" + sessionId + ", " + message); + logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message); } StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); if (StompCommand.CONNECTED == headers.getCommand()) { - this.connectedHeaders = headers; - connected(); + afterStompConnected(headers); } headers.setSessionId(this.sessionId); @@ -448,230 +434,157 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler sendMessageToClient(message); } - private void initHeartbeats() { + /** + * Invoked after the STOMP CONNECTED frame is received. At this point the + * connection is ready for sending STOMP messages to the broker. + */ + protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { + this.isStompConnected = true; + initHeartbeats(connectedHeaders); + } + + private void initHeartbeats(StompHeaderAccessor connectedHeaders) { + + // Remote clients do their own heartbeat management + if (this.isRemoteClientSession) { + return; + } long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; - long serverSendInterval = this.connectedHeaders.getHeartbeat()[0]; - long serverReceiveInterval = this.connectedHeaders.getHeartbeat()[1]; + long serverSendInterval = connectedHeaders.getHeartbeat()[0]; + long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; if ((clientSendInterval > 0) && (serverReceiveInterval > 0)) { long interval = Math.max(clientSendInterval, serverReceiveInterval); - stompConnection.connection.on().writeIdle(interval, new Runnable() { - + this.tcpConnection.onWriteInactivity(new Runnable() { @Override public void run() { - TcpConnection, Message> tcpConn = stompConnection.connection; - if (tcpConn != null) { - tcpConn.send(MessageBuilder.withPayload(new byte[] {'\n'}).build(), - new Consumer() { - @Override - public void accept(Boolean result) { - if (!result) { - handleTcpClientFailure("Failed to send heartbeat to the broker", null); + TcpConnection conn = tcpConnection; + if (conn != null) { + conn.send(HEARTBEAT_MESSAGE).addCallback( + new ListenableFutureCallback() { + public void onFailure(Throwable t) { + handleTcpConnectionFailure("Failed to send heartbeat", null); } - } - }); + public void onSuccess(Boolean result) {} + }); } } - }); + }, interval); } if (clientReceiveInterval > 0 && serverSendInterval > 0) { final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; - stompConnection.connection.on().readIdle(interval, new Runnable() { - + this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms"; - if (logger.isWarnEnabled()) { - logger.warn(message); - } - disconnected(message); + handleTcpConnectionFailure("No hearbeat from broker for more than " + + interval + "ms, closing connection", null); } - }); - } - } - - protected void connected() { - if (!this.isRemoteClientSession) { - initHeartbeats(); - } - this.stompConnection.setReady(); - } - - protected void handleTcpClientFailure(String message, Throwable ex) { - if (logger.isErrorEnabled()) { - logger.error(message + ", sessionId=" + this.sessionId, ex); + }, interval); } - disconnected(message); } - protected void disconnected(String errorMessage) { - this.stompConnection.setDisconnected(); - sendError(errorMessage); + @Override + public void afterConnectionClosed() { + sendStompErrorToClient("Connection to broker closed"); } - private void sendError(String errorText) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); - headers.setSessionId(this.sessionId); - headers.setMessage(errorText); - Message errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); - sendMessageToClient(errorMessage); - } + public ListenableFuture forward(final Message message) { - protected void sendMessageToClient(Message message) { - if (this.isRemoteClientSession) { - messageChannel.send(message); - } - else { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (StompCommand.ERROR.equals(headers.getCommand())) { - if (logger.isErrorEnabled()) { - logger.error("STOMP ERROR on sessionId=" + this.sessionId + ": " + message); - } + if (!this.isStompConnected) { + if (logger.isWarnEnabled()) { + logger.warn("Connection to broker inactive or not ready, ignoring message=" + message); } - // ignore otherwise - } - } - - private void forward(Message message) { - TcpConnection, Message> tcpConnection = this.stompConnection.getReadyConnection(); - if (tcpConnection == null) { - logger.warn("Connection to STOMP broker is not active"); - handleForwardFailure(message); - } - else if (!forwardInternal(message, tcpConnection)) { - handleForwardFailure(message); - } - } - - protected void handleForwardFailure(Message message) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to forward message to the broker. message=" + message); + return new ListenableFutureTask(new Callable() { + @Override + public Boolean call() throws Exception { + return Boolean.FALSE; + } + }); } - } - - private boolean forwardInternal( - Message message, TcpConnection, Message> tcpConnection) { - - Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]"); - @SuppressWarnings("unchecked") - Message byteMessage = (Message) message; if (logger.isTraceEnabled()) { - logger.trace("Forwarding to STOMP broker, message: " + message); + logger.trace("Forwarding message to broker: " + message); } - StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); + @SuppressWarnings("unchecked") + ListenableFuture future = this.tcpConnection.send((Message) message); - final Deferred> deferred = new DeferredPromiseSpec().get(); - tcpConnection.send(byteMessage, new Consumer() { + future.addCallback(new ListenableFutureCallback() { @Override - public void accept(Boolean success) { - deferred.accept(success); - } - }); - - Boolean success = null; - try { - success = deferred.compose().await(); - if (success == null) { - handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null); - } - else if (!success) { - handleTcpClientFailure("Failed to forward message to the broker", null); - } - else { + public void onSuccess(Boolean result) { + StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); if (command == StompCommand.DISCONNECT) { - this.stompConnection.setDisconnected(); + resetTcpConnection(); } } - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleTcpClientFailure("Interrupted while forwarding message to the broker", ex); - } - return (success != null) ? success : false; - } - } - - private static class StompConnection { - - private volatile TcpConnection, Message> connection; - - private AtomicReference, Message>> readyConnection = - new AtomicReference, Message>>(); - - - public void setTcpConnection(TcpConnection, Message> connection) { - Assert.notNull(connection, "connection must not be null"); - this.connection = connection; - } - - /** - * Return the underlying {@link TcpConnection} but only after the CONNECTED STOMP - * frame is received. - */ - public TcpConnection, Message> getReadyConnection() { - return this.readyConnection.get(); - } - - public void setReady() { - this.readyConnection.set(this.connection); - } + @Override + public void onFailure(Throwable t) { + handleTcpConnectionFailure("Failed to send message " + message, t); + } + }); - public boolean isReady() { - return (this.readyConnection.get() != null); + return future; } - public void setDisconnected() { - this.readyConnection.set(null); - - TcpConnection, Message> localConnection = this.connection; - if (localConnection != null) { - localConnection.close(); - this.connection = null; + public void resetTcpConnection() { + TcpConnection conn = this.tcpConnection; + this.isStompConnected = false; + this.tcpConnection = null; + if (conn != null) { + try { + this.tcpConnection.close(); + } + catch (Throwable t) { + // ignore + } } } - @Override - public String toString() { - return "StompConnection [ready=" + isReady() + "]"; - } } - private class SystemStompRelaySession extends StompRelaySession { + private class SystemStompConnectionHandler extends StompConnectionHandler { - public static final String ID = "stompRelaySystemSessionId"; + public static final String SESSION_ID = "stompRelaySystemSessionId"; - public SystemStompRelaySession() { - super(ID, false, 5000); + public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { + super(SESSION_ID, connectHeaders, false); } @Override - protected void connected() { - super.connected(); + protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { + super.afterStompConnected(connectedHeaders); publishBrokerAvailableEvent(); } @Override - protected void disconnected(String errorMessage) { - super.disconnected(errorMessage); + protected void handleTcpConnectionFailure(String errorMessage, Throwable t) { + super.handleTcpConnectionFailure(errorMessage, t); publishBrokerUnavailableEvent(); } @Override - protected void connectionClosed() { + public void afterConnectionClosed() { + super.afterConnectionClosed(); publishBrokerUnavailableEvent(); } @Override - protected void handleForwardFailure(Message message) { - super.handleForwardFailure(message); - throw new MessageDeliveryException(message); + public ListenableFuture forward(Message message) { + try { + ListenableFuture future = super.forward(message); + if (!future.get()) { + throw new MessageDeliveryException(message); + } + return future; + } + catch (Throwable t) { + throw new MessageDeliveryException(message, t); + } } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java new file mode 100644 index 0000000000..63e1193639 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java @@ -0,0 +1,43 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + + +/** + * A simple strategy for making reconnect attempts at a fixed interval. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class FixedIntervalReconnectStrategy implements ReconnectStrategy { + + private final long interval; + + + /** + * @param interval the frequency, in millisecond, at which to try to reconnect + */ + public FixedIntervalReconnectStrategy(long interval) { + this.interval = interval; + } + + @Override + public Long getTimeToNextAttempt(int attemptCount) { + return this.interval; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java new file mode 100644 index 0000000000..ac0f5951e0 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java @@ -0,0 +1,110 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; + +import reactor.core.composable.Promise; +import reactor.function.Consumer; + +/** + * Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting + * the result Object type {@code } to the expected target type {@code }. + * + * @param the type of object expected from the {@link Promise} + * @param the type of object expected from the {@link ListenableFuture} + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +abstract class PromiseToListenableFutureAdapter implements ListenableFuture { + + private final Promise promise; + + private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + + + protected PromiseToListenableFutureAdapter(Promise promise) { + + Assert.notNull(promise, "promise is required"); + this.promise = promise; + + this.promise.onSuccess(new Consumer() { + @Override + public void accept(S result) { + try { + registry.success(adapt(result)); + } + catch (Throwable t) { + registry.failure(t); + } + } + }); + + this.promise.onError(new Consumer() { + @Override + public void accept(Throwable t) { + registry.failure(t); + } + }); + } + + protected abstract T adapt(S adapteeResult); + + @Override + public T get() { + S result = this.promise.get(); + return adapt(result); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + S result = this.promise.await(timeout, unit); + if (result == null) { + throw new TimeoutException(); + } + return adapt(result); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return this.promise.isComplete(); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java new file mode 100644 index 0000000000..459258ece2 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java @@ -0,0 +1,127 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.net.InetSocketAddress; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; + +import reactor.core.Environment; +import reactor.core.composable.Composable; +import reactor.core.composable.Promise; +import reactor.function.Consumer; +import reactor.io.Buffer; +import reactor.tcp.Reconnect; +import reactor.tcp.TcpClient; +import reactor.tcp.TcpConnection; +import reactor.tcp.encoding.Codec; +import reactor.tcp.netty.NettyTcpClient; +import reactor.tcp.spec.TcpClientSpec; +import reactor.tuple.Tuple; +import reactor.tuple.Tuple2; + +/** + * A Reactor/Netty implementation of {@link TcpOperations}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class ReactorNettyTcpClient

implements TcpOperations

{ + + private Environment environment; + + private TcpClient, Message

> tcpClient; + + + public ReactorNettyTcpClient(String host, int port, Codec, Message

> codec) { + this.environment = new Environment(); + this.tcpClient = new TcpClientSpec, Message

>(NettyTcpClient.class) + .env(this.environment) + .codec(codec) + .connect(host, port) + .get(); + } + + + @Override + public void connect(TcpConnectionHandler

connectionHandler) { + this.connect(connectionHandler, null); + } + + @Override + public void connect(final TcpConnectionHandler

connectionHandler, + final ReconnectStrategy reconnectStrategy) { + + Composable, Message

>> composable; + + if (reconnectStrategy != null) { + composable = this.tcpClient.open(new Reconnect() { + @Override + public Tuple2 reconnect(InetSocketAddress address, int attempt) { + return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); + } + }); + } + else { + composable = this.tcpClient.open(); + } + + composable.when(Throwable.class, new Consumer() { + @Override + public void accept(Throwable ex) { + connectionHandler.afterConnectFailure(ex); + } + }); + + composable.consume(new Consumer, Message

>>() { + @Override + public void accept(TcpConnection, Message

> connection) { + connection.on().close(new Runnable() { + @Override + public void run() { + connectionHandler.afterConnectionClosed(); + } + }); + connection.in().consume(new Consumer>() { + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }); + connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); + } + }); + } + + @Override + public ListenableFuture shutdown() { + try { + Promise promise = this.tcpClient.close(); + return new PromiseToListenableFutureAdapter(promise) { + @Override + protected Void adapt(Void result) { + return result; + } + }; + } + finally { + this.environment.shutdown(); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java new file mode 100644 index 0000000000..81e99db187 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java @@ -0,0 +1,134 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; + +import reactor.core.composable.Deferred; +import reactor.core.composable.Promise; +import reactor.core.composable.spec.DeferredPromiseSpec; +import reactor.function.Consumer; + + +public class ReactorTcpConnection

implements TcpConnection

{ + + private final reactor.tcp.TcpConnection, Message

> reactorTcpConnection; + + + public ReactorTcpConnection(reactor.tcp.TcpConnection, Message

> connection) { + this.reactorTcpConnection = connection; + } + + @Override + public ListenableFuture send(Message

message) { + ConsumerListenableFuture future = new ConsumerListenableFuture(); + this.reactorTcpConnection.send(message, future); + return future; + } + + @Override + public void onReadInactivity(Runnable runnable, long inactivityDuration) { + this.reactorTcpConnection.on().readIdle(inactivityDuration, runnable); + } + + @Override + public void onWriteInactivity(Runnable runnable, long inactivityDuration) { + this.reactorTcpConnection.on().writeIdle(inactivityDuration, runnable); + } + + @Override + public void close() { + this.reactorTcpConnection.close(); + } + + + // Use this temporarily until reactor provides a send method returning a Promise + + + private static class ConsumerListenableFuture implements ListenableFuture, Consumer { + + final Deferred> deferred = new DeferredPromiseSpec().get(); + + private final ListenableFutureCallbackRegistry registry = + new ListenableFutureCallbackRegistry(); + + @Override + public void accept(Boolean result) { + + this.deferred.accept(result); + + if (result == null) { + this.registry.failure(new TimeoutException()); + } + else if (result) { + this.registry.success(result); + } + else { + this.registry.failure(new Exception("Failed send message")); + } + } + + @Override + public Boolean get() { + try { + return this.deferred.compose().await(); + } + catch (InterruptedException e) { + return Boolean.FALSE; + } + } + + @Override + public Boolean get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + Boolean result = this.deferred.compose().await(timeout, unit); + if (result == null) { + throw new TimeoutException(); + } + return result; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return this.deferred.compose().isComplete(); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java new file mode 100644 index 0000000000..b81be7c79b --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + + +/** + * A contract to determine the frequency of reconnect attempts after connection failure. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface ReconnectStrategy { + + /** + * Return the time to the next attempt to reconnect. + * + * @param attemptCount how many reconnect attempts have been made already + * @return the amount of time in milliseconds or {@code null} to stop + */ + Long getTimeToNextAttempt(int attemptCount); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java new file mode 100644 index 0000000000..57f89036ab --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * A contract for sending messages and managing a TCP connection. + * + * @param

the type of payload for outbound {@link Message}s + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpConnection

{ + + /** + * Send the given message. + * @param message the message + * @return whether the send succeeded or not + */ + ListenableFuture send(Message

message); + + /** + * Register a task to invoke after a period of of read inactivity. + * @param runnable the task to invoke + * @param duration the amount of inactive time in milliseconds + */ + void onReadInactivity(Runnable runnable, long duration); + + /** + * Register a task to invoke after a period of of write inactivity. + * @param runnable the task to invoke + * @param duration the amount of inactive time in milliseconds + */ + void onWriteInactivity(Runnable runnable, long duration); + + /** + * Close the connection. + */ + void close(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java new file mode 100644 index 0000000000..43b37487bb --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.messaging.Message; + +/** + * A contract for managing lifecycle events for a TCP connection including + * the handling of incoming messages. + * + * @param

the type of payload for in and outbound messages + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpConnectionHandler

{ + + /** + * Invoked after a connection is successfully established. + * @param connection the connection + */ + void afterConnected(TcpConnection

connection); + + /** + * Invoked after a connection failure. + * @param ex the exception + */ + void afterConnectFailure(Throwable ex); + + /** + * Handle a message received from the remote host. + * @param message the message + */ + void handleMessage(Message

message); + + /** + * Invoked after the connection is closed. + */ + void afterConnectionClosed(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java new file mode 100644 index 0000000000..f4b12e40c9 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java @@ -0,0 +1,51 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.util.concurrent.ListenableFuture; + +/** + * A contract for establishing TCP connections. + * + * @param

the type of payload for in and outbound messages + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpOperations

{ + + /** + * Open a new connection. + * + * @param connectionHandler a handler to manage the connection + */ + void connect(TcpConnectionHandler

connectionHandler); + + /** + * Open a new connection and a strategy for reconnecting if the connection fails. + * + * @param connectionHandler a handler to manage the connection + * @param reconnectStrategy a strategy for reconnecting + */ + void connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy reconnectStrategy); + + /** + * Shut down and close any open connections. + */ + ListenableFuture shutdown(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java new file mode 100644 index 0000000000..56fb3227dc --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java @@ -0,0 +1,9 @@ +/** + * Contains abstractions and implementation classes for establishing TCP connections via + * {@link org.springframework.messaging.support.tcp.TcpOperations TcpOperations}, + * handling messages via + * {@link org.springframework.messaging.support.tcp.TcpConnectionHandler TcpConnectionHandler}, + * as well as sending messages via + * {@link org.springframework.messaging.support.tcp.TcpConnection TcpConnection}. + */ +package org.springframework.messaging.support.tcp; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 40eb1a5636..d542ec8160 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -231,14 +231,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void disconnectClosesRelaySessionCleanly() throws Exception { - String sess1 = "sess1"; - MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(conn1); - this.relay.handleMessage(conn1.message); + MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); + this.responseHandler.expect(connect); + this.relay.handleMessage(connect.message); this.responseHandler.awaitAndAssert(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.setSessionId(sess1); + headers.setSessionId("sess1"); this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); @@ -330,9 +329,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { StringBuilder sb = new StringBuilder("\n"); synchronized (this.monitor) { - sb.append("INCOMPLETE:\n").append(this.expected).append("\n"); - sb.append("COMPLETE:\n").append(this.actual).append("\n"); - sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n"); + sb.append("UNMATCHED EXPECTATIONS:\n").append(this.expected).append("\n"); + sb.append("MATCHED EXPECTATIONS:\n").append(this.actual).append("\n"); + sb.append("UNEXPECTED MESSAGES:\n").append(this.unexpected).append("\n"); } return sb.toString(); -- GitLab