diff --git a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java index d92d93de3557a2d13ddd94716af918e29755b22c..e208fabe6e7e39daa6ff6d1ffdd8f009341e4466 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/MessageDeliveryException.java @@ -37,6 +37,10 @@ public class MessageDeliveryException extends MessagingException { super(undeliveredMessage, description); } + public MessageDeliveryException(Message message, Throwable cause) { + super(message, cause); + } + public MessageDeliveryException(Message undeliveredMessage, String description, Throwable cause) { super(undeliveredMessage, description, cause); } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java index 5b6447e71ac305525c6abea00cda77bed8da1da0..cc0f1be16d72c37ab6038a4c2d0b898e6e18910b 100644 --- a/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java +++ b/spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java @@ -16,11 +16,10 @@ package org.springframework.messaging.simp.stomp; -import java.net.InetSocketAddress; import java.util.Collection; import java.util.Map; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicReference; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; @@ -29,21 +28,15 @@ import org.springframework.messaging.MessageHandler; import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.handler.AbstractBrokerMessageHandler; import org.springframework.messaging.support.MessageBuilder; +import org.springframework.messaging.support.tcp.FixedIntervalReconnectStrategy; +import org.springframework.messaging.support.tcp.ReactorNettyTcpClient; +import org.springframework.messaging.support.tcp.TcpConnection; +import org.springframework.messaging.support.tcp.TcpConnectionHandler; +import org.springframework.messaging.support.tcp.TcpOperations; import org.springframework.util.Assert; - -import reactor.core.Environment; -import reactor.core.composable.Composable; -import reactor.core.composable.Deferred; -import reactor.core.composable.Promise; -import reactor.core.composable.spec.DeferredPromiseSpec; -import reactor.function.Consumer; -import reactor.tcp.Reconnect; -import reactor.tcp.TcpClient; -import reactor.tcp.TcpConnection; -import reactor.tcp.netty.NettyTcpClient; -import reactor.tcp.spec.TcpClientSpec; -import reactor.tuple.Tuple; -import reactor.tuple.Tuple2; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureTask; /** @@ -61,7 +54,7 @@ import reactor.tuple.Tuple2; * opposed to from a client). Such messages are recognized because they are not associated * with any client and therefore do not have a session id header. The "system" connection * is effectively shared and cannot be used to receive messages. Several properties are - * provided to configure the "system" session including the the + * provided to configure the "system" connection including the the * {@link #setSystemLogin(String) login} {@link #setSystemPasscode(String) passcode}, * heartbeat {@link #setSystemHeartbeatSendInterval(long) send} and * {@link #setSystemHeartbeatReceiveInterval(long) receive} intervals. @@ -72,6 +65,13 @@ import reactor.tuple.Tuple2; */ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler { + private static final byte[] EMPTY_PAYLOAD = new byte[0]; + + private static final Message HEARTBEAT_MESSAGE = MessageBuilder.withPayload(new byte[] {'\n'}).build(); + + private static final long HEARTBEAT_MULTIPLIER = 3; + + private final MessageChannel messageChannel; private String relayHost = "127.0.0.1"; @@ -88,11 +88,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler private String virtualHost; - private Environment environment; + private TcpOperations tcpClient; - private TcpClient, Message> tcpClient; - - private final Map relaySessions = new ConcurrentHashMap(); + private final Map connectionHandlers = + new ConcurrentHashMap(); /** @@ -137,20 +136,31 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the interval, in milliseconds, at which the "system" relay session will, in the + * Configure the TCP client to for managing STOMP over TCP connections to the message + * broker. This is an optional property that can be used to replace the default + * implementation used for example for testing purposes. + *

+ * By default an instance of {@link ReactorNettyTcpClient} is used. + */ + public void setTcpClient(TcpOperations tcpClient) { + this.tcpClient = tcpClient; + } + + /** + * Set the interval, in milliseconds, at which the "system" connection will, in the * absence of any other data being sent, send a heartbeat to the STOMP broker. A value * of zero will prevent heartbeats from being sent to the broker. *

* The default value is 10000. *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemHeartbeatSendInterval(long systemHeartbeatSendInterval) { this.systemHeartbeatSendInterval = systemHeartbeatSendInterval; } /** - * @return The interval, in milliseconds, at which the "system" relay session will + * @return The interval, in milliseconds, at which the "system" connection will * send heartbeats to the STOMP broker. */ public long getSystemHeartbeatSendInterval() { @@ -158,21 +168,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the maximum interval, in milliseconds, at which the "system" relay session + * Set the maximum interval, in milliseconds, at which the "system" connection * expects, in the absence of any other data, to receive a heartbeat from the STOMP - * broker. A value of zero will configure the relay session to expect not to receive + * broker. A value of zero will configure the connection to expect not to receive * heartbeats from the broker. *

* The default value is 10000. *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemHeartbeatReceiveInterval(long heartbeatReceiveInterval) { this.systemHeartbeatReceiveInterval = heartbeatReceiveInterval; } /** - * @return The interval, in milliseconds, at which the "system" relay session expects + * @return The interval, in milliseconds, at which the "system" connection expects * to receive heartbeats from the STOMP broker. */ public long getSystemHeartbeatReceiveInterval() { @@ -180,10 +190,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * Set the login for the "system" relay session used to send messages to the STOMP + * Set the login for the "system" connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemLogin(String systemLogin) { Assert.hasText(systemLogin, "systemLogin must not be empty"); @@ -191,24 +201,24 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler } /** - * @return the login used by the "system" relay session to connect to the STOMP broker + * @return the login used by the "system" connection to connect to the STOMP broker */ public String getSystemLogin() { return this.systemLogin; } /** - * Set the passcode for the "system" relay session used to send messages to the STOMP + * Set the passcode for the "system" connection used to send messages to the STOMP * broker without having a client session (e.g. REST/HTTP request handling method). *

- * See class-level documentation for more information on the "system" session. + * See class-level documentation for more information on the "system" connection. */ public void setSystemPasscode(String systemPasscode) { this.systemPasscode = systemPasscode; } /** - * @return the passcode used by the "system" relay session to connect to the STOMP broker + * @return the passcode used by the "system" connection to connect to the STOMP broker */ public String getSystemPasscode() { return this.systemPasscode; @@ -237,12 +247,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler @Override protected void startInternal() { - this.environment = new Environment(); - this.tcpClient = new TcpClientSpec, Message>(NettyTcpClient.class) - .env(this.environment) - .codec(new StompCodec()) - .connect(this.relayHost, this.relayPort) - .get(); + + this.tcpClient = new ReactorNettyTcpClient(this.relayHost, this.relayPort, new StompCodec()); if (logger.isDebugEnabled()) { logger.debug("Initializing \"system\" TCP connection"); @@ -254,30 +260,28 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler headers.setPasscode(this.systemPasscode); headers.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval); headers.setHost(getVirtualHost()); - Message message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); - SystemStompRelaySession session = new SystemStompRelaySession(); - session.connect(message); + SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers); + this.connectionHandlers.put(handler.getSessionId(), handler); - this.relaySessions.put(session.getId(), session); + this.tcpClient.connect(handler, new FixedIntervalReconnectStrategy(5000)); } @Override protected void stopInternal() { - for (StompRelaySession session: this.relaySessions.values()) { - session.disconnect(); - } - try { - this.tcpClient.close().await(); - } - catch (Throwable t) { - logger.error("Failed to close reactor TCP client", t); + for (StompConnectionHandler handler : this.connectionHandlers.values()) { + try { + handler.resetTcpConnection(); + } + catch (Throwable t) { + logger.error("Failed to close STOMP connection " + t.getMessage()); + } } try { - this.environment.shutdown(); + this.tcpClient.shutdown(); } catch (Throwable t) { - logger.error("Failed to shut down reactor Environment", t); + logger.error("Error while shutting down TCP client", t); } } @@ -291,7 +295,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler SimpMessageType messageType = headers.getMessageType(); if (SimpMessageType.MESSAGE.equals(messageType)) { - sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId; + sessionId = (sessionId == null) ? SystemStompConnectionHandler.SESSION_ID : sessionId; headers.setSessionId(sessionId); command = headers.updateStompCommandAsClientMessage(); message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); @@ -309,138 +313,120 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler if (SimpMessageType.CONNECT.equals(messageType)) { if (getVirtualHost() != null) { headers.setHost(getVirtualHost()); - message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); } - StompRelaySession session = new StompRelaySession(sessionId); - session.connect(message); - this.relaySessions.put(sessionId, session); + StompConnectionHandler handler = new StompConnectionHandler(sessionId, headers); + this.connectionHandlers.put(sessionId, handler); + this.tcpClient.connect(handler); } else if (SimpMessageType.DISCONNECT.equals(messageType)) { - StompRelaySession session = this.relaySessions.remove(sessionId); - if (session == null) { + StompConnectionHandler handler = removeConnectionHandler(sessionId); + if (handler == null) { if (logger.isTraceEnabled()) { - logger.trace("Session already removed, sessionId=" + sessionId); + logger.trace("Connection already removed for sessionId=" + sessionId); } return; } - session.forward(message); + handler.forward(message); } else { - StompRelaySession session = this.relaySessions.get(sessionId); - if (session == null) { - logger.warn("Session id=" + sessionId + " not found. Ignoring message: " + message); + StompConnectionHandler handler = this.connectionHandlers.get(sessionId); + if (handler == null) { + logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message); return; } - session.forward(message); + handler.forward(message); } } + private StompConnectionHandler removeConnectionHandler(String sessionId) { + return SystemStompConnectionHandler.SESSION_ID.equals(sessionId) + ? null : this.connectionHandlers.remove(sessionId); + } - private class StompRelaySession { - private static final long HEARTBEAT_MULTIPLIER = 3; + private class StompConnectionHandler implements TcpConnectionHandler { private final String sessionId; private final boolean isRemoteClientSession; - private final long reconnectInterval; - - private volatile StompConnection stompConnection = new StompConnection(); + private final StompHeaderAccessor connectHeaders; - private volatile StompHeaderAccessor connectHeaders; + private volatile TcpConnection tcpConnection; - private volatile StompHeaderAccessor connectedHeaders; + private volatile boolean isStompConnected; - private StompRelaySession(String sessionId) { - this(sessionId, true, 0); + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders) { + this(sessionId, connectHeaders, true); } - private StompRelaySession(String sessionId, boolean isRemoteClientSession, long reconnectInterval) { + private StompConnectionHandler(String sessionId, StompHeaderAccessor connectHeaders, + boolean isRemoteClientSession) { + Assert.notNull(sessionId, "sessionId is required"); + Assert.notNull(connectHeaders, "connectHeaders is required"); + this.sessionId = sessionId; + this.connectHeaders = connectHeaders; this.isRemoteClientSession = isRemoteClientSession; - this.reconnectInterval = reconnectInterval; } - - public String getId() { + public String getSessionId() { return this.sessionId; } - public void connect(final Message connectMessage) { - - Assert.notNull(connectMessage, "connectMessage is required"); - this.connectHeaders = StompHeaderAccessor.wrap(connectMessage); - - Composable, Message>> promise; - if (this.reconnectInterval > 0) { - promise = tcpClient.open(new Reconnect() { - @Override - public Tuple2 reconnect(InetSocketAddress address, int attempt) { - return Tuple.of(address, 5000L); - } - }); - } - else { - promise = tcpClient.open(); - } - - promise.consume(new Consumer, Message>>() { - @Override - public void accept(TcpConnection, Message> connection) { - handleConnectionReady(connection, connectMessage); - } - }); - promise.when(Throwable.class, new Consumer() { - @Override - public void accept(Throwable ex) { - relaySessions.remove(sessionId); - handleTcpClientFailure("Failed to connect to message broker", ex); - } - }); + @Override + public void afterConnected(TcpConnection connection) { + this.tcpConnection = connection; + connection.send(MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(this.connectHeaders).build()); } - public void disconnect() { - this.stompConnection.setDisconnected(); + @Override + public void afterConnectFailure(Throwable ex) { + handleTcpConnectionFailure("Failed to connect to message broker", ex); } - protected void handleConnectionReady( - TcpConnection, Message> tcpConn, final Message connectMessage) { + /** + * Invoked when any TCP connectivity issue is detected, i.e. failure to establish + * the TCP connection, failure to send a message, missed heartbeat. + */ + protected void handleTcpConnectionFailure(String errorMessage, Throwable ex) { + if (logger.isErrorEnabled()) { + logger.error(errorMessage + ", sessionId=" + this.sessionId, ex); + } + resetTcpConnection(); + sendStompErrorToClient(errorMessage); + } - this.stompConnection.setTcpConnection(tcpConn); - tcpConn.on().close(new Runnable() { - @Override - public void run() { - connectionClosed(); - } - }); - tcpConn.in().consume(new Consumer>() { - @Override - public void accept(Message message) { - readStompFrame(message); + private void sendStompErrorToClient(String errorText) { + if (this.isRemoteClientSession) { + StompConnectionHandler removed = removeConnectionHandler(this.sessionId); + if (removed != null) { + StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); + headers.setSessionId(this.sessionId); + headers.setMessage(errorText); + Message errorMessage = MessageBuilder.withPayload(EMPTY_PAYLOAD).setHeaders(headers).build(); + sendMessageToClient(errorMessage); } - }); - forwardInternal(connectMessage, tcpConn); + } } - protected void connectionClosed() { - relaySessions.remove(this.sessionId); - if (this.stompConnection.isReady()) { - sendError("Lost connection to the broker"); + protected void sendMessageToClient(Message message) { + if (this.isRemoteClientSession) { + StompBrokerRelayMessageHandler.this.messageChannel.send(message); } } - private void readStompFrame(Message message) { + @Override + public void handleMessage(Message message) { if (logger.isTraceEnabled()) { - logger.trace("Reading message for sessionId=" + sessionId + ", " + message); + logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message); } StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); if (StompCommand.CONNECTED == headers.getCommand()) { - this.connectedHeaders = headers; - connected(); + afterStompConnected(headers); } headers.setSessionId(this.sessionId); @@ -448,230 +434,157 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler sendMessageToClient(message); } - private void initHeartbeats() { + /** + * Invoked after the STOMP CONNECTED frame is received. At this point the + * connection is ready for sending STOMP messages to the broker. + */ + protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { + this.isStompConnected = true; + initHeartbeats(connectedHeaders); + } + + private void initHeartbeats(StompHeaderAccessor connectedHeaders) { + + // Remote clients do their own heartbeat management + if (this.isRemoteClientSession) { + return; + } long clientSendInterval = this.connectHeaders.getHeartbeat()[0]; long clientReceiveInterval = this.connectHeaders.getHeartbeat()[1]; - long serverSendInterval = this.connectedHeaders.getHeartbeat()[0]; - long serverReceiveInterval = this.connectedHeaders.getHeartbeat()[1]; + long serverSendInterval = connectedHeaders.getHeartbeat()[0]; + long serverReceiveInterval = connectedHeaders.getHeartbeat()[1]; if ((clientSendInterval > 0) && (serverReceiveInterval > 0)) { long interval = Math.max(clientSendInterval, serverReceiveInterval); - stompConnection.connection.on().writeIdle(interval, new Runnable() { - + this.tcpConnection.onWriteInactivity(new Runnable() { @Override public void run() { - TcpConnection, Message> tcpConn = stompConnection.connection; - if (tcpConn != null) { - tcpConn.send(MessageBuilder.withPayload(new byte[] {'\n'}).build(), - new Consumer() { - @Override - public void accept(Boolean result) { - if (!result) { - handleTcpClientFailure("Failed to send heartbeat to the broker", null); + TcpConnection conn = tcpConnection; + if (conn != null) { + conn.send(HEARTBEAT_MESSAGE).addCallback( + new ListenableFutureCallback() { + public void onFailure(Throwable t) { + handleTcpConnectionFailure("Failed to send heartbeat", null); } - } - }); + public void onSuccess(Boolean result) {} + }); } } - }); + }, interval); } if (clientReceiveInterval > 0 && serverSendInterval > 0) { final long interval = Math.max(clientReceiveInterval, serverSendInterval) * HEARTBEAT_MULTIPLIER; - stompConnection.connection.on().readIdle(interval, new Runnable() { - + this.tcpConnection.onReadInactivity(new Runnable() { @Override public void run() { - String message = "Broker hearbeat missed: connection idle for more than " + interval + "ms"; - if (logger.isWarnEnabled()) { - logger.warn(message); - } - disconnected(message); + handleTcpConnectionFailure("No hearbeat from broker for more than " + + interval + "ms, closing connection", null); } - }); - } - } - - protected void connected() { - if (!this.isRemoteClientSession) { - initHeartbeats(); - } - this.stompConnection.setReady(); - } - - protected void handleTcpClientFailure(String message, Throwable ex) { - if (logger.isErrorEnabled()) { - logger.error(message + ", sessionId=" + this.sessionId, ex); + }, interval); } - disconnected(message); } - protected void disconnected(String errorMessage) { - this.stompConnection.setDisconnected(); - sendError(errorMessage); + @Override + public void afterConnectionClosed() { + sendStompErrorToClient("Connection to broker closed"); } - private void sendError(String errorText) { - StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.ERROR); - headers.setSessionId(this.sessionId); - headers.setMessage(errorText); - Message errorMessage = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build(); - sendMessageToClient(errorMessage); - } + public ListenableFuture forward(final Message message) { - protected void sendMessageToClient(Message message) { - if (this.isRemoteClientSession) { - messageChannel.send(message); - } - else { - StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); - if (StompCommand.ERROR.equals(headers.getCommand())) { - if (logger.isErrorEnabled()) { - logger.error("STOMP ERROR on sessionId=" + this.sessionId + ": " + message); - } + if (!this.isStompConnected) { + if (logger.isWarnEnabled()) { + logger.warn("Connection to broker inactive or not ready, ignoring message=" + message); } - // ignore otherwise - } - } - - private void forward(Message message) { - TcpConnection, Message> tcpConnection = this.stompConnection.getReadyConnection(); - if (tcpConnection == null) { - logger.warn("Connection to STOMP broker is not active"); - handleForwardFailure(message); - } - else if (!forwardInternal(message, tcpConnection)) { - handleForwardFailure(message); - } - } - - protected void handleForwardFailure(Message message) { - if (logger.isWarnEnabled()) { - logger.warn("Failed to forward message to the broker. message=" + message); + return new ListenableFutureTask(new Callable() { + @Override + public Boolean call() throws Exception { + return Boolean.FALSE; + } + }); } - } - - private boolean forwardInternal( - Message message, TcpConnection, Message> tcpConnection) { - - Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]"); - @SuppressWarnings("unchecked") - Message byteMessage = (Message) message; if (logger.isTraceEnabled()) { - logger.trace("Forwarding to STOMP broker, message: " + message); + logger.trace("Forwarding message to broker: " + message); } - StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); + @SuppressWarnings("unchecked") + ListenableFuture future = this.tcpConnection.send((Message) message); - final Deferred> deferred = new DeferredPromiseSpec().get(); - tcpConnection.send(byteMessage, new Consumer() { + future.addCallback(new ListenableFutureCallback() { @Override - public void accept(Boolean success) { - deferred.accept(success); - } - }); - - Boolean success = null; - try { - success = deferred.compose().await(); - if (success == null) { - handleTcpClientFailure("Timed out waiting for message to be forwarded to the broker", null); - } - else if (!success) { - handleTcpClientFailure("Failed to forward message to the broker", null); - } - else { + public void onSuccess(Boolean result) { + StompCommand command = StompHeaderAccessor.wrap(message).getCommand(); if (command == StompCommand.DISCONNECT) { - this.stompConnection.setDisconnected(); + resetTcpConnection(); } } - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - handleTcpClientFailure("Interrupted while forwarding message to the broker", ex); - } - return (success != null) ? success : false; - } - } - - private static class StompConnection { - - private volatile TcpConnection, Message> connection; - - private AtomicReference, Message>> readyConnection = - new AtomicReference, Message>>(); - - - public void setTcpConnection(TcpConnection, Message> connection) { - Assert.notNull(connection, "connection must not be null"); - this.connection = connection; - } - - /** - * Return the underlying {@link TcpConnection} but only after the CONNECTED STOMP - * frame is received. - */ - public TcpConnection, Message> getReadyConnection() { - return this.readyConnection.get(); - } - - public void setReady() { - this.readyConnection.set(this.connection); - } + @Override + public void onFailure(Throwable t) { + handleTcpConnectionFailure("Failed to send message " + message, t); + } + }); - public boolean isReady() { - return (this.readyConnection.get() != null); + return future; } - public void setDisconnected() { - this.readyConnection.set(null); - - TcpConnection, Message> localConnection = this.connection; - if (localConnection != null) { - localConnection.close(); - this.connection = null; + public void resetTcpConnection() { + TcpConnection conn = this.tcpConnection; + this.isStompConnected = false; + this.tcpConnection = null; + if (conn != null) { + try { + this.tcpConnection.close(); + } + catch (Throwable t) { + // ignore + } } } - @Override - public String toString() { - return "StompConnection [ready=" + isReady() + "]"; - } } - private class SystemStompRelaySession extends StompRelaySession { + private class SystemStompConnectionHandler extends StompConnectionHandler { - public static final String ID = "stompRelaySystemSessionId"; + public static final String SESSION_ID = "stompRelaySystemSessionId"; - public SystemStompRelaySession() { - super(ID, false, 5000); + public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) { + super(SESSION_ID, connectHeaders, false); } @Override - protected void connected() { - super.connected(); + protected void afterStompConnected(StompHeaderAccessor connectedHeaders) { + super.afterStompConnected(connectedHeaders); publishBrokerAvailableEvent(); } @Override - protected void disconnected(String errorMessage) { - super.disconnected(errorMessage); + protected void handleTcpConnectionFailure(String errorMessage, Throwable t) { + super.handleTcpConnectionFailure(errorMessage, t); publishBrokerUnavailableEvent(); } @Override - protected void connectionClosed() { + public void afterConnectionClosed() { + super.afterConnectionClosed(); publishBrokerUnavailableEvent(); } @Override - protected void handleForwardFailure(Message message) { - super.handleForwardFailure(message); - throw new MessageDeliveryException(message); + public ListenableFuture forward(Message message) { + try { + ListenableFuture future = super.forward(message); + if (!future.get()) { + throw new MessageDeliveryException(message); + } + return future; + } + catch (Throwable t) { + throw new MessageDeliveryException(message, t); + } } } diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..63e119363922fce8a315b4b716c0732c91882a85 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/FixedIntervalReconnectStrategy.java @@ -0,0 +1,43 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + + +/** + * A simple strategy for making reconnect attempts at a fixed interval. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class FixedIntervalReconnectStrategy implements ReconnectStrategy { + + private final long interval; + + + /** + * @param interval the frequency, in millisecond, at which to try to reconnect + */ + public FixedIntervalReconnectStrategy(long interval) { + this.interval = interval; + } + + @Override + public Long getTimeToNextAttempt(int attemptCount) { + return this.interval; + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java new file mode 100644 index 0000000000000000000000000000000000000000..ac0f5951e0a7455b92b3cee250a0021d7b14b37d --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/PromiseToListenableFutureAdapter.java @@ -0,0 +1,110 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.util.Assert; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; + +import reactor.core.composable.Promise; +import reactor.function.Consumer; + +/** + * Adapts a reactor {@link Promise} to {@link ListenableFuture} optionally converting + * the result Object type {@code } to the expected target type {@code }. + * + * @param the type of object expected from the {@link Promise} + * @param the type of object expected from the {@link ListenableFuture} + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +abstract class PromiseToListenableFutureAdapter implements ListenableFuture { + + private final Promise promise; + + private final ListenableFutureCallbackRegistry registry = new ListenableFutureCallbackRegistry(); + + + protected PromiseToListenableFutureAdapter(Promise promise) { + + Assert.notNull(promise, "promise is required"); + this.promise = promise; + + this.promise.onSuccess(new Consumer() { + @Override + public void accept(S result) { + try { + registry.success(adapt(result)); + } + catch (Throwable t) { + registry.failure(t); + } + } + }); + + this.promise.onError(new Consumer() { + @Override + public void accept(Throwable t) { + registry.failure(t); + } + }); + } + + protected abstract T adapt(S adapteeResult); + + @Override + public T get() { + S result = this.promise.get(); + return adapt(result); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + S result = this.promise.await(timeout, unit); + if (result == null) { + throw new TimeoutException(); + } + return adapt(result); + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return this.promise.isComplete(); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java new file mode 100644 index 0000000000000000000000000000000000000000..459258ece2b4d025ec2aa573d2f6accc484a3d0f --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorNettyTcpClient.java @@ -0,0 +1,127 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.net.InetSocketAddress; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; + +import reactor.core.Environment; +import reactor.core.composable.Composable; +import reactor.core.composable.Promise; +import reactor.function.Consumer; +import reactor.io.Buffer; +import reactor.tcp.Reconnect; +import reactor.tcp.TcpClient; +import reactor.tcp.TcpConnection; +import reactor.tcp.encoding.Codec; +import reactor.tcp.netty.NettyTcpClient; +import reactor.tcp.spec.TcpClientSpec; +import reactor.tuple.Tuple; +import reactor.tuple.Tuple2; + +/** + * A Reactor/Netty implementation of {@link TcpOperations}. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public class ReactorNettyTcpClient

implements TcpOperations

{ + + private Environment environment; + + private TcpClient, Message

> tcpClient; + + + public ReactorNettyTcpClient(String host, int port, Codec, Message

> codec) { + this.environment = new Environment(); + this.tcpClient = new TcpClientSpec, Message

>(NettyTcpClient.class) + .env(this.environment) + .codec(codec) + .connect(host, port) + .get(); + } + + + @Override + public void connect(TcpConnectionHandler

connectionHandler) { + this.connect(connectionHandler, null); + } + + @Override + public void connect(final TcpConnectionHandler

connectionHandler, + final ReconnectStrategy reconnectStrategy) { + + Composable, Message

>> composable; + + if (reconnectStrategy != null) { + composable = this.tcpClient.open(new Reconnect() { + @Override + public Tuple2 reconnect(InetSocketAddress address, int attempt) { + return Tuple.of(address, reconnectStrategy.getTimeToNextAttempt(attempt)); + } + }); + } + else { + composable = this.tcpClient.open(); + } + + composable.when(Throwable.class, new Consumer() { + @Override + public void accept(Throwable ex) { + connectionHandler.afterConnectFailure(ex); + } + }); + + composable.consume(new Consumer, Message

>>() { + @Override + public void accept(TcpConnection, Message

> connection) { + connection.on().close(new Runnable() { + @Override + public void run() { + connectionHandler.afterConnectionClosed(); + } + }); + connection.in().consume(new Consumer>() { + @Override + public void accept(Message

message) { + connectionHandler.handleMessage(message); + } + }); + connectionHandler.afterConnected(new ReactorTcpConnection

(connection)); + } + }); + } + + @Override + public ListenableFuture shutdown() { + try { + Promise promise = this.tcpClient.close(); + return new PromiseToListenableFutureAdapter(promise) { + @Override + protected Void adapt(Void result) { + return result; + } + }; + } + finally { + this.environment.shutdown(); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java new file mode 100644 index 0000000000000000000000000000000000000000..81e99db1877084322e334120bbb58c3675447f84 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReactorTcpConnection.java @@ -0,0 +1,134 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; +import org.springframework.util.concurrent.ListenableFutureCallback; +import org.springframework.util.concurrent.ListenableFutureCallbackRegistry; + +import reactor.core.composable.Deferred; +import reactor.core.composable.Promise; +import reactor.core.composable.spec.DeferredPromiseSpec; +import reactor.function.Consumer; + + +public class ReactorTcpConnection

implements TcpConnection

{ + + private final reactor.tcp.TcpConnection, Message

> reactorTcpConnection; + + + public ReactorTcpConnection(reactor.tcp.TcpConnection, Message

> connection) { + this.reactorTcpConnection = connection; + } + + @Override + public ListenableFuture send(Message

message) { + ConsumerListenableFuture future = new ConsumerListenableFuture(); + this.reactorTcpConnection.send(message, future); + return future; + } + + @Override + public void onReadInactivity(Runnable runnable, long inactivityDuration) { + this.reactorTcpConnection.on().readIdle(inactivityDuration, runnable); + } + + @Override + public void onWriteInactivity(Runnable runnable, long inactivityDuration) { + this.reactorTcpConnection.on().writeIdle(inactivityDuration, runnable); + } + + @Override + public void close() { + this.reactorTcpConnection.close(); + } + + + // Use this temporarily until reactor provides a send method returning a Promise + + + private static class ConsumerListenableFuture implements ListenableFuture, Consumer { + + final Deferred> deferred = new DeferredPromiseSpec().get(); + + private final ListenableFutureCallbackRegistry registry = + new ListenableFutureCallbackRegistry(); + + @Override + public void accept(Boolean result) { + + this.deferred.accept(result); + + if (result == null) { + this.registry.failure(new TimeoutException()); + } + else if (result) { + this.registry.success(result); + } + else { + this.registry.failure(new Exception("Failed send message")); + } + } + + @Override + public Boolean get() { + try { + return this.deferred.compose().await(); + } + catch (InterruptedException e) { + return Boolean.FALSE; + } + } + + @Override + public Boolean get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + + Boolean result = this.deferred.compose().await(timeout, unit); + if (result == null) { + throw new TimeoutException(); + } + return result; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return false; + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public boolean isDone() { + return this.deferred.compose().isComplete(); + } + + @Override + public void addCallback(ListenableFutureCallback callback) { + this.registry.addCallback(callback); + } + } + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..b81be7c79b731caf46fc3ee22fddeb009a6ab1f3 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/ReconnectStrategy.java @@ -0,0 +1,36 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + + +/** + * A contract to determine the frequency of reconnect attempts after connection failure. + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface ReconnectStrategy { + + /** + * Return the time to the next attempt to reconnect. + * + * @param attemptCount how many reconnect attempts have been made already + * @return the amount of time in milliseconds or {@code null} to stop + */ + Long getTimeToNextAttempt(int attemptCount); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java new file mode 100644 index 0000000000000000000000000000000000000000..57f89036ab7626649684af4c7e5213052dd78d50 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnection.java @@ -0,0 +1,58 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.messaging.Message; +import org.springframework.util.concurrent.ListenableFuture; + +/** + * A contract for sending messages and managing a TCP connection. + * + * @param

the type of payload for outbound {@link Message}s + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpConnection

{ + + /** + * Send the given message. + * @param message the message + * @return whether the send succeeded or not + */ + ListenableFuture send(Message

message); + + /** + * Register a task to invoke after a period of of read inactivity. + * @param runnable the task to invoke + * @param duration the amount of inactive time in milliseconds + */ + void onReadInactivity(Runnable runnable, long duration); + + /** + * Register a task to invoke after a period of of write inactivity. + * @param runnable the task to invoke + * @param duration the amount of inactive time in milliseconds + */ + void onWriteInactivity(Runnable runnable, long duration); + + /** + * Close the connection. + */ + void close(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..43b37487bbc21e0c764d339402c036c36ab0640d --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpConnectionHandler.java @@ -0,0 +1,55 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.messaging.Message; + +/** + * A contract for managing lifecycle events for a TCP connection including + * the handling of incoming messages. + * + * @param

the type of payload for in and outbound messages + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpConnectionHandler

{ + + /** + * Invoked after a connection is successfully established. + * @param connection the connection + */ + void afterConnected(TcpConnection

connection); + + /** + * Invoked after a connection failure. + * @param ex the exception + */ + void afterConnectFailure(Throwable ex); + + /** + * Handle a message received from the remote host. + * @param message the message + */ + void handleMessage(Message

message); + + /** + * Invoked after the connection is closed. + */ + void afterConnectionClosed(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java new file mode 100644 index 0000000000000000000000000000000000000000..f4b12e40c9d02c20b4047dc1e13663721351d072 --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/TcpOperations.java @@ -0,0 +1,51 @@ +/* + * Copyright 2002-2013 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.support.tcp; + +import org.springframework.util.concurrent.ListenableFuture; + +/** + * A contract for establishing TCP connections. + * + * @param

the type of payload for in and outbound messages + * + * @author Rossen Stoyanchev + * @since 4.0 + */ +public interface TcpOperations

{ + + /** + * Open a new connection. + * + * @param connectionHandler a handler to manage the connection + */ + void connect(TcpConnectionHandler

connectionHandler); + + /** + * Open a new connection and a strategy for reconnecting if the connection fails. + * + * @param connectionHandler a handler to manage the connection + * @param reconnectStrategy a strategy for reconnecting + */ + void connect(TcpConnectionHandler

connectionHandler, ReconnectStrategy reconnectStrategy); + + /** + * Shut down and close any open connections. + */ + ListenableFuture shutdown(); + +} diff --git a/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java new file mode 100644 index 0000000000000000000000000000000000000000..56fb3227dc7ca122b8750892dd9d64b7b7c3710d --- /dev/null +++ b/spring-messaging/src/main/java/org/springframework/messaging/support/tcp/package-info.java @@ -0,0 +1,9 @@ +/** + * Contains abstractions and implementation classes for establishing TCP connections via + * {@link org.springframework.messaging.support.tcp.TcpOperations TcpOperations}, + * handling messages via + * {@link org.springframework.messaging.support.tcp.TcpConnectionHandler TcpConnectionHandler}, + * as well as sending messages via + * {@link org.springframework.messaging.support.tcp.TcpConnection TcpConnection}. + */ +package org.springframework.messaging.support.tcp; diff --git a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java index 40eb1a5636a59872a8bd2181e668f45a7967dc8c..d542ec8160af8974401ec7d6356ce4da09ba2bcc 100644 --- a/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java +++ b/spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerIntegrationTests.java @@ -231,14 +231,13 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { @Test public void disconnectClosesRelaySessionCleanly() throws Exception { - String sess1 = "sess1"; - MessageExchange conn1 = MessageExchangeBuilder.connect(sess1).build(); - this.responseHandler.expect(conn1); - this.relay.handleMessage(conn1.message); + MessageExchange connect = MessageExchangeBuilder.connect("sess1").build(); + this.responseHandler.expect(connect); + this.relay.handleMessage(connect.message); this.responseHandler.awaitAndAssert(); StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.DISCONNECT); - headers.setSessionId(sess1); + headers.setSessionId("sess1"); this.relay.handleMessage(MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build()); @@ -330,9 +329,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { StringBuilder sb = new StringBuilder("\n"); synchronized (this.monitor) { - sb.append("INCOMPLETE:\n").append(this.expected).append("\n"); - sb.append("COMPLETE:\n").append(this.actual).append("\n"); - sb.append("UNMATCHED MESSAGES:\n").append(this.unexpected).append("\n"); + sb.append("UNMATCHED EXPECTATIONS:\n").append(this.expected).append("\n"); + sb.append("MATCHED EXPECTATIONS:\n").append(this.actual).append("\n"); + sb.append("UNEXPECTED MESSAGES:\n").append(this.unexpected).append("\n"); } return sb.toString();