提交 17aea892 编写于 作者: R Rossen Stoyanchev

Fix issue with forwarding messaging to STOMP broker

上级 4e3390ae
...@@ -283,7 +283,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -283,7 +283,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (SimpMessageType.MESSAGE.equals(messageType)) { if (SimpMessageType.MESSAGE.equals(messageType)) {
sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId; sessionId = (sessionId == null) ? SystemStompRelaySession.ID : sessionId;
headers.setSessionId(sessionId); headers.setSessionId(sessionId);
headers.updateStompCommandAsClientMessage(); command = headers.updateStompCommandAsClientMessage();
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build(); message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
} }
...@@ -292,7 +292,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -292,7 +292,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return; return;
} }
if (command != null && command.requiresDestination() && !checkDestinationPrefix(destination)) { if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) {
return; return;
} }
...@@ -386,7 +386,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -386,7 +386,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
readStompFrame(message); readStompFrame(message);
} }
}); });
forwardInternal(tcpConn, connectMessage); forwardInternal(connectMessage, tcpConn);
} }
protected void connectionClosed() { protected void connectionClosed() {
...@@ -445,7 +445,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -445,7 +445,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.warn("Connection to STOMP broker is not active"); logger.warn("Connection to STOMP broker is not active");
handleForwardFailure(message); handleForwardFailure(message);
} }
else if (!forwardInternal(tcpConnection, message)) { else if (!forwardInternal(message, tcpConnection)) {
handleForwardFailure(message); handleForwardFailure(message);
} }
} }
...@@ -457,7 +457,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -457,7 +457,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
private boolean forwardInternal( private boolean forwardInternal(
TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection, Message<?> message) { Message<?> message, TcpConnection<Message<byte[]>, Message<byte[]>> tcpConnection) {
Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]"); Assert.isInstanceOf(byte[].class, message.getPayload(), "Message's payload must be a byte[]");
......
...@@ -220,7 +220,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { ...@@ -220,7 +220,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
return toNativeHeaderMap(); return toNativeHeaderMap();
} }
public void updateStompCommandAsClientMessage() { public StompCommand updateStompCommandAsClientMessage() {
Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()), Assert.state(SimpMessageType.MESSAGE.equals(getMessageType()),
"Unexpected message type " + getMessage()); "Unexpected message type " + getMessage());
...@@ -231,6 +231,8 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor { ...@@ -231,6 +231,8 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
else if (!getCommand().equals(StompCommand.SEND)) { else if (!getCommand().equals(StompCommand.SEND)) {
throw new IllegalStateException("Unexpected STOMP command " + getCommand()); throw new IllegalStateException("Unexpected STOMP command " + getCommand());
} }
return getCommand();
} }
public void updateStompCommandAsServerMessage() { public void updateStompCommandAsServerMessage() {
......
...@@ -113,7 +113,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests { ...@@ -113,7 +113,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
} }
} }
// test "host" header (virtualHost property) when TCP client is behind interface and configurable // When TCP client is behind interface and configurable:
// test "host" header (virtualHost property)
// test "/user/.." destination is excluded
@Test @Test
public void publishSubscribe() throws Exception { public void publishSubscribe() throws Exception {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册