提交 df5d22e1 编写于 作者: R Rossen Stoyanchev

Improve logging in spring-messaging

Before this change the amount of logging was too little or too much
with TRACE turned on. This change separates useful debugging
information and logs it under DEBUG and leaves more detailed
information to be logged under TRACE.
上级 72dec7d0
...@@ -318,17 +318,21 @@ public abstract class AbstractMethodMessageHandler<T> ...@@ -318,17 +318,21 @@ public abstract class AbstractMethodMessageHandler<T>
public void handleMessage(Message<?> message) throws MessagingException { public void handleMessage(Message<?> message) throws MessagingException {
String destination = getDestination(message); String destination = getDestination(message);
String lookupDestination = getLookupDestination(destination); if (destination == null) {
logger.trace("Ignoring message, no destination");
return;
}
String lookupDestination = getLookupDestination(destination);
if (lookupDestination == null) { if (lookupDestination == null) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Ignoring message with destination=" + destination); logger.trace("Ignoring message to destination=" + destination);
} }
return; return;
} }
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Handling message " + message); logger.debug("Handling message, lookupDestination=" + lookupDestination);
} }
message = MessageBuilder.fromMessage(message).setHeader(LOOKUP_DESTINATION_HEADER, lookupDestination).build(); message = MessageBuilder.fromMessage(message).setHeader(LOOKUP_DESTINATION_HEADER, lookupDestination).build();
...@@ -438,6 +442,10 @@ public abstract class AbstractMethodMessageHandler<T> ...@@ -438,6 +442,10 @@ public abstract class AbstractMethodMessageHandler<T>
protected void handleMatch(T mapping, HandlerMethod handlerMethod, String lookupDestination, Message<?> message) { protected void handleMatch(T mapping, HandlerMethod handlerMethod, String lookupDestination, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Message matched to " + handlerMethod);
}
handlerMethod = handlerMethod.createWithResolvedBean(); handlerMethod = handlerMethod.createWithResolvedBean();
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod); InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod);
invocable.setMessageMethodArgumentResolvers(this.argumentResolvers); invocable.setMessageMethodArgumentResolvers(this.argumentResolvers);
...@@ -495,7 +503,11 @@ public abstract class AbstractMethodMessageHandler<T> ...@@ -495,7 +503,11 @@ public abstract class AbstractMethodMessageHandler<T>
protected abstract AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class<?> beanType); protected abstract AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class<?> beanType);
protected abstract void handleNoMatch(Set<T> ts, String lookupDestination, Message<?> message); protected void handleNoMatch(Set<T> ts, String lookupDestination, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("No matching method found");
}
}
/** /**
......
...@@ -96,7 +96,7 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String ...@@ -96,7 +96,7 @@ public class SimpMessagingTemplate extends AbstractMessageSendingTemplate<String
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
String destination = headers.getDestination(); String destination = headers.getDestination();
destination = (destination != null) ? destination : getRequiredDefaultDestination(); destination = (destination != null) ? destination : getRequiredDefaultDestination();
doSend(getRequiredDefaultDestination(), message); doSend(destination, message);
} }
@Override @Override
......
...@@ -113,18 +113,25 @@ public abstract class WebSocketMessageBrokerConfigurationSupport { ...@@ -113,18 +113,25 @@ public abstract class WebSocketMessageBrokerConfigurationSupport {
@Bean @Bean
public AbstractSubscribableChannel webSocketRequestChannel() { public AbstractSubscribableChannel webSocketRequestChannel() {
return new ExecutorSubscribableChannel(webSocketChannelExecutor()); return new ExecutorSubscribableChannel(webSocketRequestChannelExecutor());
}
@Bean
public ThreadPoolTaskExecutor webSocketRequestChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("WebSocketRequestChannel-");
return executor;
} }
@Bean @Bean
public AbstractSubscribableChannel webSocketResponseChannel() { public AbstractSubscribableChannel webSocketResponseChannel() {
return new ExecutorSubscribableChannel(webSocketChannelExecutor()); return new ExecutorSubscribableChannel(webSocketResponseChannelExecutor());
} }
@Bean @Bean
public ThreadPoolTaskExecutor webSocketChannelExecutor() { public ThreadPoolTaskExecutor webSocketResponseChannelExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setThreadNamePrefix("BrokerWebSocketChannel-"); executor.setThreadNamePrefix("WebSocketResponseChannel-");
return executor; return executor;
} }
......
...@@ -132,18 +132,12 @@ public abstract class AbstractBrokerMessageHandler ...@@ -132,18 +132,12 @@ public abstract class AbstractBrokerMessageHandler
@Override @Override
public final void handleMessage(Message<?> message) { public final void handleMessage(Message<?> message) {
if (!this.running) { if (!this.running) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("STOMP broker relay not running. Ignoring message id=" + message.getHeaders().getId()); logger.trace("Message broker is not running. Ignoring message id=" + message.getHeaders().getId());
} }
return; return;
} }
if (logger.isTraceEnabled()) {
logger.trace("Handling message " + message);
}
handleMessageInternal(message); handleMessageInternal(message);
} }
......
...@@ -97,20 +97,17 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist ...@@ -97,20 +97,17 @@ public abstract class AbstractSubscriptionRegistry implements SubscriptionRegist
public final MultiValueMap<String, String> findSubscriptions(Message<?> message) { public final MultiValueMap<String, String> findSubscriptions(Message<?> message) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
if (!SimpMessageType.MESSAGE.equals(headers.getMessageType())) { if (!SimpMessageType.MESSAGE.equals(headers.getMessageType())) {
logger.error("Unexpected message type: " + message); logger.trace("Ignoring message type " + headers.getMessageType());
return null; return null;
} }
String destination = headers.getDestination(); String destination = headers.getDestination();
if (destination == null) { if (destination == null) {
logger.error("Ignoring destination. No destination in message: " + message); logger.trace("Ignoring message, no destination");
return null; return null;
} }
if (logger.isTraceEnabled()) {
logger.trace("Find subscriptions, destination=" + headers.getDestination());
}
MultiValueMap<String, String> result = findSubscriptionsInternal(destination, message); MultiValueMap<String, String> result = findSubscriptionsInternal(destination, message);
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Found " + result.size() + " subscriptions"); logger.trace("Found " + result.size() + " subscriptions for destination=" + headers.getDestination());
} }
return result; return result;
} }
......
...@@ -120,10 +120,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { ...@@ -120,10 +120,6 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) { private UserDestinationInfo getUserDestinationInfo(SimpMessageHeaderAccessor headers) {
String destination = headers.getDestination(); String destination = headers.getDestination();
if (destination == null) {
logger.trace("Ignoring message, no destination");
return null;
}
String targetUser; String targetUser;
String targetDestination; String targetDestination;
...@@ -132,20 +128,18 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { ...@@ -132,20 +128,18 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
SimpMessageType messageType = headers.getMessageType(); SimpMessageType messageType = headers.getMessageType();
if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) { if (SimpMessageType.SUBSCRIBE.equals(messageType) || SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
if (user == null) { if (!checkDestination(destination, this.subscriptionDestinationPrefix)) {
logger.trace("Ignoring (un)subscribe message, no user information");
return null; return null;
} }
if (!destination.startsWith(this.subscriptionDestinationPrefix)) { if (user == null) {
logger.trace("Ignoring (un)subscribe message, not a \"user\" destination"); logger.warn("Ignoring message, no user information");
return null; return null;
} }
targetUser = user.getName(); targetUser = user.getName();
targetDestination = destination.substring(this.destinationPrefix.length()-1); targetDestination = destination.substring(this.destinationPrefix.length()-1);
} }
else if (SimpMessageType.MESSAGE.equals(messageType)) { else if (SimpMessageType.MESSAGE.equals(messageType)) {
if (!destination.startsWith(this.destinationPrefix)) { if (!checkDestination(destination, this.destinationPrefix)) {
logger.trace("Ignoring message, not a \"user\" destination");
return null; return null;
} }
int startIndex = this.destinationPrefix.length(); int startIndex = this.destinationPrefix.length();
...@@ -156,13 +150,29 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver { ...@@ -156,13 +150,29 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
} }
else { else {
logger.trace("Ignoring message, not of the right message type"); if (logger.isTraceEnabled()) {
logger.trace("Ignoring " + messageType + " message");
}
return null; return null;
} }
return new UserDestinationInfo(targetUser, targetDestination); return new UserDestinationInfo(targetUser, targetDestination);
} }
protected boolean checkDestination(String destination, String requiredPrefix) {
if (destination == null) {
logger.trace("Ignoring message, no destination");
return false;
}
if (!destination.startsWith(requiredPrefix)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to " + destination + ", not a \"user\" destination");
}
return false;
}
return true;
}
private static class UserDestinationInfo { private static class UserDestinationInfo {
......
...@@ -295,13 +295,6 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan ...@@ -295,13 +295,6 @@ public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHan
super.handleMatch(mapping, handlerMethod, lookupDestination, message); super.handleMatch(mapping, handlerMethod, lookupDestination, message);
} }
@Override
protected void handleNoMatch(Set<SimpMessageMappingInfo> set, String lookupDestination, Message<?> message) {
if (logger.isTraceEnabled()) {
logger.trace("No match for " + lookupDestination);
}
}
@Override @Override
protected AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class<?> beanType) { protected AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class<?> beanType) {
return new AnnotationExceptionHandlerMethodResolver(beanType); return new AnnotationExceptionHandlerMethodResolver(beanType);
......
...@@ -83,7 +83,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { ...@@ -83,7 +83,7 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
if (!checkDestinationPrefix(destination)) { if (!checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Ingoring message with destination " + destination); logger.trace("Ingoring message to destination=" + destination);
} }
return; return;
} }
...@@ -113,13 +113,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler { ...@@ -113,13 +113,15 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
protected void sendMessageToSubscribers(String destination, Message<?> message) { protected void sendMessageToSubscribers(String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message); MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
if ((subscriptions.size() > 0) && logger.isDebugEnabled()) {
logger.debug("Sending message with destination=" + destination
+ " to " + subscriptions.size() + " subscriber(s)");
}
for (String sessionId : subscriptions.keySet()) { for (String sessionId : subscriptions.keySet()) {
for (String subscriptionId : subscriptions.get(sessionId)) { for (String subscriptionId : subscriptions.get(sessionId)) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message); SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
headers.setSessionId(sessionId); headers.setSessionId(sessionId);
headers.setSubscriptionId(subscriptionId); headers.setSubscriptionId(subscriptionId);
Object payload = message.getPayload(); Object payload = message.getPayload();
Message<?> clientMessage = MessageBuilder.withPayload(payload).setHeaders(headers).build(); Message<?> clientMessage = MessageBuilder.withPayload(payload).setHeaders(headers).build();
try { try {
......
...@@ -87,22 +87,15 @@ public class UserDestinationMessageHandler implements MessageHandler { ...@@ -87,22 +87,15 @@ public class UserDestinationMessageHandler implements MessageHandler {
@Override @Override
public void handleMessage(Message<?> message) throws MessagingException { public void handleMessage(Message<?> message) throws MessagingException {
if (logger.isTraceEnabled()) {
logger.trace("Handling message " + message);
}
Set<String> destinations = this.userDestinationResolver.resolveDestination(message); Set<String> destinations = this.userDestinationResolver.resolveDestination(message);
if (CollectionUtils.isEmpty(destinations)) { if (CollectionUtils.isEmpty(destinations)) {
return; return;
} }
for (String targetDestination : destinations) { for (String targetDestination : destinations) {
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Sending message to resolved user destination: " + targetDestination); logger.debug("Sending message to resolved destination=" + targetDestination);
} }
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.wrap(message);
headers.setDestination(targetDestination);
message = MessageBuilder.withPayload(message.getPayload()).setHeaders(headers).build();
this.messagingTemplate.send(targetDestination, message); this.messagingTemplate.send(targetDestination, message);
} }
} }
......
...@@ -306,15 +306,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -306,15 +306,21 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
if (sessionId == null) { if (sessionId == null) {
logger.error("No sessionId, ignoring message: " + message); if (logger.isWarnEnabled()) {
logger.warn("No sessionId, ignoring message: " + message);
}
return; return;
} }
if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) { if ((command != null) && command.requiresDestination() && !checkDestinationPrefix(destination)) {
if (logger.isTraceEnabled()) {
logger.trace("Ignoring message to destination=" + destination);
}
return; return;
} }
if (SimpMessageType.CONNECT.equals(messageType)) { if (SimpMessageType.CONNECT.equals(messageType)) {
logger.debug("Processing CONNECT in session=" + sessionId);
if (getVirtualHost() != null) { if (getVirtualHost() != null) {
headers.setHost(getVirtualHost()); headers.setHost(getVirtualHost());
} }
...@@ -335,7 +341,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -335,7 +341,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
else { else {
StompConnectionHandler handler = this.connectionHandlers.get(sessionId); StompConnectionHandler handler = this.connectionHandlers.get(sessionId);
if (handler == null) { if (handler == null) {
logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message: " + message); logger.warn("Connection for sessionId=" + sessionId + " not found. Ignoring message");
return; return;
} }
handler.forward(message); handler.forward(message);
...@@ -424,11 +430,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -424,11 +430,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override @Override
public void handleMessage(Message<byte[]> message) { public void handleMessage(Message<byte[]> message) {
if (logger.isTraceEnabled()) {
logger.trace("Reading message for sessionId=" + this.sessionId + ", " + message);
}
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) {
logger.trace("Received broker heartbeat");
}
else if (logger.isDebugEnabled()) {
logger.debug("Received broker message in session=" + this.sessionId);
}
if (StompCommand.CONNECTED == headers.getCommand()) { if (StompCommand.CONNECTED == headers.getCommand()) {
afterStompConnected(headers); afterStompConnected(headers);
} }
...@@ -500,7 +510,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -500,7 +510,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (!this.isStompConnected) { if (!this.isStompConnected) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn("Connection to broker inactive or not ready, ignoring message=" + message); logger.warn("Connection to broker inactive or not ready. Ignoring message");
} }
return new ListenableFutureTask<Void>(new Callable<Void>() { return new ListenableFutureTask<Void>(new Callable<Void>() {
@Override @Override
...@@ -510,8 +520,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -510,8 +520,14 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
}); });
} }
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
logger.trace("Forwarding message to broker: " + message); StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) {
logger.trace("Forwarding heartbeat to broker");
}
else {
logger.debug("Forwarding message to broker");
}
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -548,6 +564,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler ...@@ -548,6 +564,10 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
} }
} }
@Override
public String toString() {
return "StompConnectionHandler{" + "sessionId=" + this.sessionId + "}";
}
} }
private class SystemStompConnectionHandler extends StompConnectionHandler { private class SystemStompConnectionHandler extends StompConnectionHandler {
......
...@@ -69,14 +69,17 @@ public class StompDecoder { ...@@ -69,14 +69,17 @@ public class StompDecoder {
decodedMessage = MessageBuilder.withPayload(payload) decodedMessage = MessageBuilder.withPayload(payload)
.setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build(); .setHeaders(StompHeaderAccessor.create(stompCommand, headers)).build();
if (logger.isDebugEnabled()) {
logger.debug("Decoded " + decodedMessage);
}
} }
else { else {
decodedMessage = MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).setHeaders( decodedMessage = MessageBuilder.withPayload(HEARTBEAT_PAYLOAD).setHeaders(
StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build(); StompHeaderAccessor.create(SimpMessageType.HEARTBEAT)).build();
} if (logger.isTraceEnabled()) {
logger.trace("Decoded heartbeat");
if (logger.isTraceEnabled()) { }
logger.trace("Decoded " + decodedMessage);
} }
return decodedMessage; return decodedMessage;
...@@ -101,7 +104,7 @@ public class StompDecoder { ...@@ -101,7 +104,7 @@ public class StompDecoder {
if (headerStream.size() > 0) { if (headerStream.size() > 0) {
String header = new String(headerStream.toByteArray(), UTF8_CHARSET); String header = new String(headerStream.toByteArray(), UTF8_CHARSET);
int colonIndex = header.indexOf(':'); int colonIndex = header.indexOf(':');
if (colonIndex <= 0 || colonIndex == header.length() - 1) { if ((colonIndex <= 0) || (colonIndex == header.length() - 1)) {
throw new StompConversionException( throw new StompConversionException(
"Illegal header: '" + header + "'. A header must be of the form <name>:<value"); "Illegal header: '" + header + "'. A header must be of the form <name>:<value");
} }
...@@ -148,7 +151,6 @@ public class StompDecoder { ...@@ -148,7 +151,6 @@ public class StompDecoder {
} }
} }
} }
throw new StompConversionException("Frame must be terminated with a null octect"); throw new StompConversionException("Frame must be terminated with a null octect");
} }
......
...@@ -21,6 +21,7 @@ import java.io.DataOutputStream; ...@@ -21,6 +21,7 @@ import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
...@@ -53,9 +54,6 @@ public final class StompEncoder { ...@@ -53,9 +54,6 @@ public final class StompEncoder {
*/ */
public byte[] encode(Message<byte[]> message) { public byte[] encode(Message<byte[]> message) {
try { try {
if (logger.isTraceEnabled()) {
logger.trace("Encoding " + message);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos); DataOutputStream output = new DataOutputStream(baos);
...@@ -90,7 +88,14 @@ public final class StompEncoder { ...@@ -90,7 +88,14 @@ public final class StompEncoder {
private void writeHeaders(StompHeaderAccessor headers, Message<byte[]> message, DataOutputStream output) private void writeHeaders(StompHeaderAccessor headers, Message<byte[]> message, DataOutputStream output)
throws IOException { throws IOException {
for (Entry<String, List<String>> entry : headers.toStompHeaderMap().entrySet()) { Map<String,List<String>> stompHeaders = headers.toStompHeaderMap();
if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) {
logger.trace("Encoded heartbeat");
}
else if (logger.isDebugEnabled()) {
logger.debug("Encoded STOMP command=" + headers.getCommand() + " headers=" + stompHeaders);
}
for (Entry<String, List<String>> entry : stompHeaders.entrySet()) {
byte[] key = getUtf8BytesEscapingIfNecessary(entry.getKey(), headers); byte[] key = getUtf8BytesEscapingIfNecessary(entry.getKey(), headers);
for (String value : entry.getValue()) { for (String value : entry.getValue()) {
output.write(key); output.write(key);
...@@ -99,9 +104,9 @@ public final class StompEncoder { ...@@ -99,9 +104,9 @@ public final class StompEncoder {
output.write(LF); output.write(LF);
} }
} }
if (headers.getCommand() == StompCommand.SEND || if ((headers.getCommand() == StompCommand.SEND) || (headers.getCommand() == StompCommand.MESSAGE) ||
headers.getCommand() == StompCommand.MESSAGE || (headers.getCommand() == StompCommand.ERROR)) {
headers.getCommand() == StompCommand.ERROR) {
output.write("content-length:".getBytes(UTF8_CHARSET)); output.write("content-length:".getBytes(UTF8_CHARSET));
output.write(Integer.toString(message.getPayload().length).getBytes(UTF8_CHARSET)); output.write(Integer.toString(message.getPayload().length).getBytes(UTF8_CHARSET));
output.write(LF); output.write(LF);
......
...@@ -104,12 +104,15 @@ public class StompProtocolHandler implements SubProtocolHandler { ...@@ -104,12 +104,15 @@ public class StompProtocolHandler implements SubProtocolHandler {
return; return;
} }
if (logger.isTraceEnabled()) {
logger.trace("Message " + message);
}
try { try {
StompHeaderAccessor headers = StompHeaderAccessor.wrap(message); StompHeaderAccessor headers = StompHeaderAccessor.wrap(message);
if (SimpMessageType.HEARTBEAT.equals(headers.getMessageType())) {
logger.trace("Received heartbeat from client session=" + session.getId());
}
else {
logger.trace("Received message from client session=" + session.getId());
}
headers.setSessionId(session.getId()); headers.setSessionId(session.getId());
headers.setUser(session.getPrincipal()); headers.setUser(session.getPrincipal());
......
...@@ -102,8 +102,9 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName ...@@ -102,8 +102,9 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
public final boolean send(Message<?> message, long timeout) { public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null"); Assert.notNull(message, "Message must not be null");
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[" + this.beanName + "] send message " + message); logger.trace("[" + this.beanName + "] sending message id=" + message.getHeaders().getId());
} }
message = this.interceptorChain.preSend(message, this); message = this.interceptorChain.preSend(message, this);
......
...@@ -59,9 +59,6 @@ class ChannelInterceptorChain { ...@@ -59,9 +59,6 @@ class ChannelInterceptorChain {
public Message<?> preSend(Message<?> message, MessageChannel channel) { public Message<?> preSend(Message<?> message, MessageChannel channel) {
UUID originalId = message.getHeaders().getId(); UUID originalId = message.getHeaders().getId();
if (logger.isTraceEnabled()) {
logger.trace("preSend message id " + originalId);
}
for (ChannelInterceptor interceptor : this.interceptors) { for (ChannelInterceptor interceptor : this.interceptors) {
message = interceptor.preSend(message, channel); message = interceptor.preSend(message, channel);
if (message == null) { if (message == null) {
...@@ -71,9 +68,9 @@ class ChannelInterceptorChain { ...@@ -71,9 +68,9 @@ class ChannelInterceptorChain {
return null; return null;
} }
} }
if (logger.isTraceEnabled()) { if (logger.isDebugEnabled()) {
if (!message.getHeaders().getId().equals(originalId)) { if (!message.getHeaders().getId().equals(originalId)) {
logger.trace("preSend returned modified message " + message); logger.debug("preSend returned modified message, new message id=" + message.getHeaders().getId());
} }
} }
return message; return message;
......
...@@ -64,11 +64,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel { ...@@ -64,11 +64,6 @@ public class ExecutorSubscribableChannel extends AbstractSubscribableChannel {
@Override @Override
public boolean sendInternal(final Message<?> message, long timeout) { public boolean sendInternal(final Message<?> message, long timeout) {
if (logger.isTraceEnabled()) {
logger.trace("subscribers " + this.handlers);
}
for (final MessageHandler handler : this.handlers) { for (final MessageHandler handler : this.handlers) {
if (this.executor == null) { if (this.executor == null) {
handler.handleMessage(message); handler.handleMessage(message);
......
...@@ -18,6 +18,8 @@ package org.springframework.messaging.support.tcp; ...@@ -18,6 +18,8 @@ package org.springframework.messaging.support.tcp;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.util.Assert; import org.springframework.util.Assert;
import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFuture;
...@@ -48,6 +50,8 @@ import reactor.tuple.Tuple2; ...@@ -48,6 +50,8 @@ import reactor.tuple.Tuple2;
*/ */
public class ReactorNettyTcpClient<P> implements TcpOperations<P> { public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
private final static Log logger = LogFactory.getLog(ReactorNettyTcpClient.class);
private Environment environment; private Environment environment;
private TcpClient<Message<P>, Message<P>> tcpClient; private TcpClient<Message<P>, Message<P>> tcpClient;
...@@ -120,6 +124,12 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> { ...@@ -120,6 +124,12 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
connectionHandler.handleMessage(message); connectionHandler.handleMessage(message);
} }
}); });
connection.when(Throwable.class, new Consumer<Throwable>() {
@Override
public void accept(Throwable t) {
logger.error("Exception on connection " + connectionHandler, t);
}
});
connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection)); connectionHandler.afterConnected(new ReactorTcpConnection<P>(connection));
} }
}); });
......
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.util.ArrayList;
import java.util.List;
/**
* A stub MessageChannel that saves all sent messages.
*
* @author Rossen Stoyanchev
*/
public class StubMessageChannel implements MessageChannel {
private final List<Message<byte[]>> messages = new ArrayList<>();
public List<Message<byte[]>> getMessages() {
return this.messages;
}
@Override
@SuppressWarnings("unchecked")
public boolean send(Message<?> message) {
this.messages.add((Message<byte[]>) message);
return true;
}
@Override
@SuppressWarnings("unchecked")
public boolean send(Message<?> message, long timeout) {
this.messages.add((Message<byte[]>) message);
return true;
}
}
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package org.springframework.messaging.simp.handler; package org.springframework.messaging.simp.handler;
import org.apache.activemq.transport.stomp.Stomp;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
...@@ -25,6 +26,8 @@ import org.springframework.messaging.core.MessageSendingOperations; ...@@ -25,6 +26,8 @@ import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.simp.TestPrincipal; import org.springframework.messaging.simp.TestPrincipal;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
......
...@@ -23,7 +23,7 @@ import java.util.concurrent.Callable; ...@@ -23,7 +23,7 @@ import java.util.concurrent.Callable;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel; import org.springframework.messaging.StubMessageChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType; import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
...@@ -47,16 +47,13 @@ public class StompBrokerRelayMessageHandlerTests { ...@@ -47,16 +47,13 @@ public class StompBrokerRelayMessageHandlerTests {
private StubTcpOperations tcpClient; private StubTcpOperations tcpClient;
private StubMessageChannel responseChannel;
@Before @Before
public void setup() { public void setup() {
this.responseChannel = new StubMessageChannel();
this.tcpClient = new StubTcpOperations(); this.tcpClient = new StubTcpOperations();
this.brokerRelay = new StompBrokerRelayMessageHandler(this.responseChannel, Arrays.asList("/topic")); this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(), Arrays.asList("/topic"));
this.brokerRelay.setTcpClient(tcpClient); this.brokerRelay.setTcpClient(tcpClient);
} }
...@@ -161,25 +158,4 @@ public class StompBrokerRelayMessageHandlerTests { ...@@ -161,25 +158,4 @@ public class StompBrokerRelayMessageHandlerTests {
} }
} }
private static class StubMessageChannel implements MessageChannel {
private final List<Message<byte[]>> messages = new ArrayList<>();
@Override
@SuppressWarnings("unchecked")
public boolean send(Message<?> message) {
this.messages.add((Message<byte[]>) message);
return true;
}
@Override
@SuppressWarnings("unchecked")
public boolean send(Message<?> message, long timeout) {
this.messages.add((Message<byte[]>) message);
return true;
}
}
} }
...@@ -49,8 +49,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator ...@@ -49,8 +49,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator
@Override @Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception { public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
if (logger.isDebugEnabled()) { if (logger.isTraceEnabled()) {
logger.debug(message + ", " + session); logger.trace(message + ", " + session);
} }
super.handleMessage(session, message); super.handleMessage(session, message);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册