提交 48236be4 编写于 作者: R Rossen Stoyanchev

STOMP and WebSocket messaging related logging updates

This change removes most logging at INFO level and also ensures the
amount of information logged at DEBUG level is useful, brief, and
not duplicated.

Also added is custom logging for STOMP frames to ensure very readable
and consise output.

Issue: SPR-11934
上级 ab4864da
......@@ -21,7 +21,6 @@ import java.lang.reflect.Method;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.core.BridgeMethodResolver;
import org.springframework.core.MethodParameter;
......@@ -44,7 +43,6 @@ import org.springframework.util.ClassUtils;
*/
public class HandlerMethod {
/** Logger that is available to subclasses */
protected final Log logger = LogFactory.getLog(HandlerMethod.class);
private final Object bean;
......@@ -238,6 +236,11 @@ public class HandlerMethod {
return this.bean.hashCode() * 31 + this.method.hashCode();
}
public String getShortLogMessage() {
int args = method.getParameterTypes().length;
return getBeanType().getName() + "#" + this.method.getName() + "[" + args + " args]";
}
@Override
public String toString() {
return this.method.toGenericString();
......
......@@ -80,9 +80,6 @@ public class HeaderMethodArgumentResolver extends AbstractNamedValueMethodArgume
if (name.startsWith("nativeHeaders.")) {
name = name.substring("nativeHeaders.".length());
if (logger.isDebugEnabled()) {
logger.debug("Looking up native header '" + name + "'");
}
}
if ((nativeHeaders == null) || !nativeHeaders.containsKey(name)) {
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -322,26 +322,23 @@ public abstract class AbstractMethodMessageHandler<T>
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String destination = getDestination(message);
if (destination == null) {
return;
}
String lookupDestination = getLookupDestination(destination);
if (lookupDestination == null) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Handling message to " + destination);
}
MessageHeaderAccessor headerAccessor = MessageHeaderAccessor.getMutableAccessor(message);
headerAccessor.setHeader(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER, lookupDestination);
headerAccessor.setLeaveMutable(true);
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Searching methods to handle " + headerAccessor.getShortLogMessage(message.getPayload()));
}
handleMessageInternal(message, lookupDestination);
headerAccessor.setImmutable();
}
......@@ -377,24 +374,20 @@ public abstract class AbstractMethodMessageHandler<T>
if (mappingsByUrl != null) {
addMatchesToCollection(mappingsByUrl, message, matches);
}
if (matches.isEmpty()) {
// No direct hits, go through all mappings
Set<T> allMappings = this.handlerMethods.keySet();
addMatchesToCollection(allMappings, message, matches);
}
if (matches.isEmpty()) {
handleNoMatch(handlerMethods.keySet(), lookupDestination, message);
return;
}
Comparator<Match> comparator = new MatchComparator(getMappingComparator(message));
Collections.sort(matches, comparator);
if (logger.isTraceEnabled()) {
logger.trace("Found " + matches.size() + " matching mapping(s) for [" +
lookupDestination + "] : " + matches);
logger.trace("Found " + matches.size() + " methods: " + matches);
}
Match bestMatch = matches.get(0);
......@@ -440,18 +433,14 @@ public abstract class AbstractMethodMessageHandler<T>
protected void handleMatch(T mapping, HandlerMethod handlerMethod, String lookupDestination, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Message matched to " + handlerMethod);
logger.debug("Invoking " + handlerMethod.getShortLogMessage());
}
handlerMethod = handlerMethod.createWithResolvedBean();
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod);
invocable.setMessageMethodArgumentResolvers(this.argumentResolvers);
try {
Object returnValue = invocable.invoke(message);
MethodParameter returnType = handlerMethod.getReturnType();
if (void.class.equals(returnType.getParameterType())) {
return;
......@@ -467,26 +456,27 @@ public abstract class AbstractMethodMessageHandler<T>
}
protected void processHandlerMethodException(HandlerMethod handlerMethod, Exception ex, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("Searching methods to handle " + ex.getClass().getSimpleName());
}
Class<?> beanType = handlerMethod.getBeanType();
AbstractExceptionHandlerMethodResolver resolver = this.exceptionHandlerCache.get(beanType);
if (resolver == null) {
resolver = createExceptionHandlerMethodResolverFor(beanType);
this.exceptionHandlerCache.put(beanType, resolver);
}
Method method = resolver.resolveMethod(ex);
if (method == null) {
logger.error("Unhandled exception", ex);
return;
}
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handlerMethod.getBean(), method);
invocable.setMessageMethodArgumentResolvers(this.argumentResolvers);
if (logger.isDebugEnabled()) {
logger.debug("Invoking " + invocable.getShortLogMessage());
}
try {
Object returnValue = invocable.invoke(message, ex);
MethodParameter returnType = invocable.getReturnType();
if (void.class.equals(returnType.getParameterType())) {
return;
......@@ -497,17 +487,21 @@ public abstract class AbstractMethodMessageHandler<T>
logger.error("Error while handling exception", t);
return;
}
}
protected abstract AbstractExceptionHandlerMethodResolver createExceptionHandlerMethodResolverFor(Class<?> beanType);
protected void handleNoMatch(Set<T> ts, String lookupDestination, Message<?> message) {
if (logger.isDebugEnabled()) {
logger.debug("No matching method found.");
logger.debug("No matching methods.");
}
}
@Override
public String toString() {
return getClass().getSimpleName() + "[prefixes=" + getDestinationPrefixes() + "]";
}
/**
* A thin wrapper around a matched HandlerMethod and its matched mapping for
......
/*
* Copyright 2002-2012 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,8 +22,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
......@@ -38,8 +36,6 @@ import org.springframework.util.Assert;
*/
public class HandlerMethodArgumentResolverComposite implements HandlerMethodArgumentResolver {
protected final Log logger = LogFactory.getLog(getClass());
private final List<HandlerMethodArgumentResolver> argumentResolvers = new LinkedList<HandlerMethodArgumentResolver>();
private final Map<MethodParameter, HandlerMethodArgumentResolver> argumentResolverCache =
......
......@@ -27,6 +27,8 @@ import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* A HandlerMethodReturnValueHandler that wraps and delegates to others.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
......@@ -79,9 +81,6 @@ public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodRe
private HandlerMethodReturnValueHandler getReturnValueHandler(MethodParameter returnType) {
for (HandlerMethodReturnValueHandler handler : this.returnValueHandlers) {
if (handler.supportsReturnType(returnType)) {
if (logger.isTraceEnabled()) {
logger.trace("Processing return value with " + handler);
}
return handler;
}
}
......@@ -94,6 +93,9 @@ public class HandlerMethodReturnValueHandlerComposite implements HandlerMethodRe
HandlerMethodReturnValueHandler handler = getReturnValueHandler(returnType);
Assert.notNull(handler, "No handler for return value type [" + returnType.getParameterType().getName() + "]");
if (logger.isTraceEnabled()) {
logger.trace("Processing return value with " + handler);
}
handler.handleReturnValue(returnValue, returnType, message);
}
......
......@@ -96,15 +96,11 @@ public class InvocableHandlerMethod extends HandlerMethod {
public final Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) {
StringBuilder sb = new StringBuilder("Invoking [");
sb.append(getBeanType().getSimpleName()).append(".");
sb.append(getMethod().getName()).append("] method with arguments ");
sb.append(Arrays.asList(args));
logger.trace(sb.toString());
logger.trace("Resolved arguments: " + Arrays.asList(args));
}
Object returnValue = invoke(args);
if (logger.isTraceEnabled()) {
logger.trace("Method [" + getMethod().getName() + "] returned [" + returnValue + "]");
logger.trace("Returned value: " + returnValue);
}
return returnValue;
}
......@@ -136,8 +132,8 @@ public class InvocableHandlerMethod extends HandlerMethod {
}
}
if (args[i] == null) {
String msg = getArgumentResolutionErrorMessage("No suitable resolver for argument", i);
throw new IllegalStateException(msg);
String error = getArgumentResolutionErrorMessage("No suitable resolver for argument", i);
throw new IllegalStateException(error);
}
}
return args;
......
......@@ -72,9 +72,11 @@ public class SimpAttributes {
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
Map<String, Object> sessionAttributes = SimpMessageHeaderAccessor.getSessionAttributes(headers);
if (sessionId == null || sessionAttributes == null) {
throw new IllegalStateException(
"Message does not contain SiMP session id or attributes: " + message);
if (sessionId == null) {
throw new IllegalStateException("No session id in " + message);
}
if (sessionAttributes == null) {
throw new IllegalStateException("No session attributes in " + message);
}
return new SimpAttributes(sessionId, sessionAttributes);
}
......
......@@ -20,11 +20,13 @@ import java.security.Principal;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.IdTimestampMessageHeaderInitializer;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* A base class for working with message headers in simple messaging protocols that
......@@ -241,4 +243,50 @@ public class SimpMessageHeaderAccessor extends NativeMessageHeaderAccessor {
return (Principal) headers.get(USER_HEADER);
}
@Override
public String getShortLogMessage(Object payload) {
if (getMessageType() == null) {
return super.getDetailedLogMessage(payload);
}
StringBuilder sb = getBaseLogMessage();
if (!CollectionUtils.isEmpty(getSessionAttributes())) {
sb.append(" attributes[").append(getSessionAttributes().size()).append("]");
}
sb.append(getShortPayloadLogMessage(payload));
return sb.toString();
}
@SuppressWarnings("unchecked")
@Override
public String getDetailedLogMessage(Object payload) {
if (getMessageType() == null) {
return super.getDetailedLogMessage(payload);
}
StringBuilder sb = getBaseLogMessage();
if (!CollectionUtils.isEmpty(getSessionAttributes())) {
sb.append(" attributes=").append(getSessionAttributes());
}
if (!CollectionUtils.isEmpty((Map<String, List<String>>) getHeader(NATIVE_HEADERS))) {
sb.append(" nativeHeaders=").append((Map<String, List<String>>) getHeader(NATIVE_HEADERS));
}
sb.append(getDetailedPayloadLogMessage(payload));
return sb.toString();
}
private StringBuilder getBaseLogMessage() {
StringBuilder sb = new StringBuilder();
sb.append(getMessageType().name());
if (getDestination() != null) {
sb.append(" destination=").append(getDestination());
}
if (getSubscriptionId() != null) {
sb.append(" subscriptionId=").append(getSubscriptionId());
}
sb.append(" session=").append(getSessionId());
if (getUser() != null) {
sb.append(" user=").append(getUser().getName());
}
return sb;
}
}
......@@ -137,7 +137,6 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
if (returnValue == null) {
return;
}
MessageHeaders headers = message.getHeaders();
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
......@@ -161,7 +160,6 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
this.messagingTemplate.convertAndSendToUser(user, destination, returnValue, createHeaders(sessionId));
}
}
return;
}
else {
SendTo sendTo = returnType.getMethodAnnotation(SendTo.class);
......@@ -182,7 +180,6 @@ public class SendToMethodReturnValueHandler implements HandlerMethodReturnValueH
}
protected String[] getTargetDestinations(Annotation annotation, Message<?> message, String defaultPrefix) {
if (annotation != null) {
String[] value = (String[]) AnnotationUtils.getValue(annotation);
if (!ObjectUtils.isEmpty(value)) {
......
......@@ -16,6 +16,8 @@
package org.springframework.messaging.simp.annotation.support;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
......@@ -45,6 +47,9 @@ import org.springframework.util.Assert;
*/
public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
private static Log logger = LogFactory.getLog(SubscriptionMethodReturnValueHandler.class);
private final MessageSendingOperations<String> messagingTemplate;
private MessageHeaderInitializer headerInitializer;
......@@ -87,13 +92,10 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
}
@Override
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message)
throws Exception {
public void handleReturnValue(Object returnValue, MethodParameter returnType, Message<?> message) throws Exception {
if (returnValue == null) {
return;
}
MessageHeaders headers = message.getHeaders();
String destination = SimpMessageHeaderAccessor.getDestination(headers);
String sessionId = SimpMessageHeaderAccessor.getSessionId(headers);
......@@ -102,6 +104,10 @@ public class SubscriptionMethodReturnValueHandler implements HandlerMethodReturn
Assert.state(subscriptionId != null,
"No subscriptionId in message=" + message + ", method=" + returnType.getMethod());
if (logger.isDebugEnabled()) {
logger.debug("Reply to @SubscribeMapping: " + returnValue);
}
this.messagingTemplate.convertAndSend(destination, returnValue, createHeaders(sessionId, subscriptionId));
}
......
......@@ -130,15 +130,15 @@ public abstract class AbstractBrokerMessageHandler
}
@Override
public final void start() {
public void start() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting...");
if (logger.isInfoEnabled()) {
logger.info("Starting...");
}
startInternal();
this.running = true;
if (logger.isDebugEnabled()) {
logger.debug("Started.");
if (logger.isInfoEnabled()) {
logger.info("Started.");
}
}
}
......@@ -147,15 +147,15 @@ public abstract class AbstractBrokerMessageHandler
}
@Override
public final void stop() {
public void stop() {
synchronized (this.lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping...");
if (logger.isInfoEnabled()) {
logger.info("Stopping...");
}
stopInternal();
this.running = false;
if (logger.isDebugEnabled()) {
logger.debug("Stopped.");
logger.info("Stopped.");
}
}
}
......@@ -172,7 +172,7 @@ public abstract class AbstractBrokerMessageHandler
}
@Override
public final void handleMessage(Message<?> message) {
public void handleMessage(Message<?> message) {
if (!this.running) {
if (logger.isTraceEnabled()) {
logger.trace(this + " not running yet. Ignoring " + message);
......@@ -200,7 +200,7 @@ public abstract class AbstractBrokerMessageHandler
boolean shouldPublish = this.brokerAvailable.compareAndSet(false, true);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isInfoEnabled()) {
logger.info("Publishing " + this.availableEvent);
logger.info(this.availableEvent);
}
this.eventPublisher.publishEvent(this.availableEvent);
}
......@@ -210,7 +210,7 @@ public abstract class AbstractBrokerMessageHandler
boolean shouldPublish = this.brokerAvailable.compareAndSet(true, false);
if (this.eventPublisher != null && shouldPublish) {
if (logger.isInfoEnabled()) {
logger.info("Publishing " + this.notAvailableEvent);
logger.info(this.notAvailableEvent);
}
this.eventPublisher.publishEvent(this.notAvailableEvent);
}
......
......@@ -275,7 +275,7 @@ public class DefaultSubscriptionRegistry extends AbstractSubscriptionRegistry {
@Override
public String toString() {
return "registry[" + sessions.size() + " session(s)]";
return "registry[" + sessions.size() + " sessions]";
}
}
......
......@@ -25,6 +25,7 @@ import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.MessageHeaderInitializer;
import org.springframework.util.Assert;
import org.springframework.util.MultiValueMap;
......@@ -138,35 +139,44 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
return;
}
SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
if (accessor == null) {
throw new IllegalStateException(
"No header accessor (not using the SimpMessagingTemplate?): " + message);
}
if (SimpMessageType.MESSAGE.equals(messageType)) {
if (logger.isDebugEnabled()) {
logger.debug("Processing " + accessor.getShortLogMessage(message.getPayload()));
}
sendMessageToSubscribers(destination, message);
}
else if (SimpMessageType.CONNECT.equals(messageType)) {
if (logger.isInfoEnabled()) {
logger.info("Handling CONNECT: " + message);
if (logger.isDebugEnabled()) {
logger.debug("Processing " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(accessor);
accessor.setSessionId(sessionId);
accessor.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
Message<byte[]> connectAck = MessageBuilder.createMessage(EMPTY_PAYLOAD, accessor.getMessageHeaders());
this.clientOutboundChannel.send(connectAck);
SimpMessageHeaderAccessor connectAck = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT_ACK);
initHeaders(connectAck);
connectAck.setSessionId(sessionId);
connectAck.setHeader(SimpMessageHeaderAccessor.CONNECT_MESSAGE_HEADER, message);
Message<byte[]> messageOut = MessageBuilder.createMessage(EMPTY_PAYLOAD, connectAck.getMessageHeaders());
this.clientOutboundChannel.send(messageOut);
}
else if (SimpMessageType.DISCONNECT.equals(messageType)) {
if (logger.isInfoEnabled()) {
logger.info("Handling DISCONNECT: " + message);
if (logger.isDebugEnabled()) {
logger.debug("Processing " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
this.subscriptionRegistry.unregisterAllSubscriptions(sessionId);
}
else if (SimpMessageType.SUBSCRIBE.equals(messageType)) {
if (logger.isDebugEnabled()) {
logger.debug("Handling SUBSCRIBE: " + message);
logger.debug("Processing " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
this.subscriptionRegistry.registerSubscription(message);
}
else if (SimpMessageType.UNSUBSCRIBE.equals(messageType)) {
if (logger.isDebugEnabled()) {
logger.debug("Handling UNSUBSCRIBE: " + message);
logger.debug("Processing " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
this.subscriptionRegistry.unregisterSubscription(message);
}
......@@ -180,8 +190,8 @@ public class SimpleBrokerMessageHandler extends AbstractBrokerMessageHandler {
protected void sendMessageToSubscribers(String destination, Message<?> message) {
MultiValueMap<String,String> subscriptions = this.subscriptionRegistry.findSubscriptions(message);
if ((subscriptions.size() > 0) && logger.isTraceEnabled()) {
logger.trace("Sending to " + subscriptions.size() + " subscriber(s): " + message);
if ((subscriptions.size() > 0) && logger.isDebugEnabled()) {
logger.debug("Broadcasting to " + subscriptions.size() + " sessions.");
}
for (String sessionId : subscriptions.keySet()) {
for (String subscriptionId : subscriptions.get(sessionId)) {
......
......@@ -357,17 +357,20 @@ public abstract class AbstractMessageBrokerConfiguration implements ApplicationC
private static final AbstractBrokerMessageHandler noopBroker = new AbstractBrokerMessageHandler() {
@Override
protected void startInternal() {
public void start() {
}
@Override
protected void stopInternal() {
public void stop() {
}
@Override
protected void handleMessageInternal(Message<?> message) {
public void handleMessage(Message<?> message) {
}
@Override
protected void handleMessageInternal(Message<?> message) {
}
};
}
......@@ -72,6 +72,8 @@ import org.springframework.util.concurrent.ListenableFutureTask;
*/
public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler {
public static final String SYSTEM_SESSION_ID = "_system_";
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final ListenableFutureTask<Void> EMPTY_TASK = new ListenableFutureTask<Void>(new VoidCallable());
......@@ -380,14 +382,18 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.info("Connecting \"system\" session to " + this.relayHost + ":" + this.relayPort);
}
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.CONNECT);
headers.setAcceptVersion("1.1,1.2");
headers.setLogin(this.systemLogin);
headers.setPasscode(this.systemPasscode);
headers.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
headers.setHost(getVirtualHost());
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.CONNECT);
accessor.setAcceptVersion("1.1,1.2");
accessor.setLogin(this.systemLogin);
accessor.setPasscode(this.systemPasscode);
accessor.setHeartbeat(this.systemHeartbeatSendInterval, this.systemHeartbeatReceiveInterval);
accessor.setHost(getVirtualHost());
accessor.setSessionId(SYSTEM_SESSION_ID);
if (logger.isDebugEnabled()) {
logger.debug("Forwarding " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
SystemStompConnectionHandler handler = new SystemStompConnectionHandler(headers);
SystemStompConnectionHandler handler = new SystemStompConnectionHandler(accessor);
this.connectionHandlers.put(handler.getSessionId(), handler);
this.stats.incrementConnectCount();
......@@ -416,7 +422,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
String sessionId = SimpMessageHeaderAccessor.getSessionId(message.getHeaders());
if (!isBrokerAvailable()) {
if (sessionId == null || SystemStompConnectionHandler.SESSION_ID.equals(sessionId)) {
if (sessionId == null || SYSTEM_SESSION_ID.equals(sessionId)) {
throw new MessageDeliveryException("Message broker not active. Consider subscribing to " +
"receive BrokerAvailabilityEvent's from an ApplicationListener Spring bean.");
}
......@@ -459,7 +465,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
logger.error("Only STOMP SEND supported from within the server side. Ignoring " + message);
return;
}
sessionId = SystemStompConnectionHandler.SESSION_ID;
sessionId = SYSTEM_SESSION_ID;
stompAccessor.setSessionId(sessionId);
}
......@@ -470,7 +476,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
if (StompCommand.CONNECT.equals(command)) {
if (logger.isDebugEnabled()) {
logger.debug("STOMP CONNECT in session " + sessionId + " (" + getConnectionCount() + " connections).");
logger.debug(stompAccessor.getShortLogMessage(EMPTY_PAYLOAD));
}
stompAccessor = (stompAccessor.isMutable() ? stompAccessor : StompHeaderAccessor.wrap(message));
stompAccessor.setLogin(this.clientLogin);
......@@ -508,8 +514,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public String toString() {
return "StompBrokerRelay[broker=" + this.relayHost + ":" + this.relayPort +
", " + getConnectionCount() + " connection(s)]";
return "StompBrokerRelay[" + this.relayHost + ":" + this.relayPort + "]";
}
......@@ -544,8 +549,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void afterConnected(TcpConnection<byte[]> connection) {
if (logger.isInfoEnabled()) {
logger.info("TCP connection established. Forwarding: " + this.connectHeaders);
if (logger.isDebugEnabled()) {
logger.debug("TCP connection opened in session=" + getSessionId());
}
this.tcpConnection = connection;
connection.send(MessageBuilder.createMessage(EMPTY_PAYLOAD, this.connectHeaders.getMessageHeaders()));
......@@ -601,24 +606,24 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void handleMessage(Message<byte[]> message) {
StompHeaderAccessor headerAccessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
headerAccessor.setSessionId(this.sessionId);
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
accessor.setSessionId(this.sessionId);
if (StompCommand.CONNECTED.equals(headerAccessor.getCommand())) {
if (logger.isInfoEnabled()) {
logger.info("Received STOMP CONNECTED: " + headerAccessor);
StompCommand command = accessor.getCommand();
if (StompCommand.CONNECTED.equals(command)) {
if (logger.isDebugEnabled()) {
logger.debug("Received " + accessor.getShortLogMessage(EMPTY_PAYLOAD));
}
afterStompConnected(headerAccessor);
afterStompConnected(accessor);
}
else if (StompCommand.ERROR.equals(headerAccessor.getCommand()) && logger.isErrorEnabled()) {
logger.error("Received STOMP ERROR: " + message);
else if (logger.isErrorEnabled() && StompCommand.ERROR.equals(command)) {
logger.error("Received " + accessor.getShortLogMessage(message.getPayload()));
}
else if (logger.isTraceEnabled()) {
logger.trace(headerAccessor.isHeartbeat() ?
"Received heartbeat in session " + this.sessionId : "Received " + headerAccessor);
logger.trace("Received " + accessor.getDetailedLogMessage(message.getPayload()));
}
headerAccessor.setImmutable();
accessor.setImmutable();
sendMessageToClient(message);
}
......@@ -688,8 +693,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
return;
}
try {
if (logger.isInfoEnabled()) {
logger.info("TCP connection to broker closed in session " + this.sessionId);
if (logger.isDebugEnabled()) {
logger.debug("TCP connection to broker closed in session " + this.sessionId);
}
sendStompErrorFrameToClient("Connection to broker closed.");
}
......@@ -729,14 +734,16 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
* @return a future to wait for the result
*/
@SuppressWarnings("unchecked")
public ListenableFuture<Void> forward(Message<?> message, final StompHeaderAccessor accessor) {
public ListenableFuture<Void> forward(final Message<?> message, final StompHeaderAccessor accessor) {
TcpConnection<byte[]> conn = this.tcpConnection;
if (!this.isStompConnected) {
if (this.isRemoteClientSession) {
if (logger.isDebugEnabled()) {
logger.debug("TCP connection closed already, ignoring " + message);
logger.debug("TCP connection closed already, ignoring " +
accessor.getShortLogMessage((byte[]) message.getPayload()));
}
return EMPTY_TASK;
}
......@@ -744,7 +751,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
throw new IllegalStateException("Cannot forward messages " +
(conn != null ? "before STOMP CONNECTED. " : "while inactive. ") +
"Consider subscribing to receive BrokerAvailabilityEvent's from " +
"an ApplicationListener Spring bean. Dropped " + message);
"an ApplicationListener Spring bean. Dropped " +
accessor.getShortLogMessage((byte[]) message.getPayload()));
}
}
......@@ -752,20 +760,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders()) : message;
StompCommand command = accessor.getCommand();
if (accessor.isHeartbeat()) {
logger.trace("Forwarding heartbeat in session " + this.sessionId);
}
else if (StompCommand.SUBSCRIBE.equals(command) && logger.isDebugEnabled()) {
logger.debug("Forwarding SUBSCRIBE: " + messageToSend);
}
else if (StompCommand.UNSUBSCRIBE.equals(command) && logger.isDebugEnabled()) {
logger.debug("Forwarding UNSUBSCRIBE: " + messageToSend);
}
else if (StompCommand.DISCONNECT.equals(command) && logger.isInfoEnabled()) {
logger.info("Forwarding DISCONNECT: " + messageToSend);
if (logger.isDebugEnabled() &&
StompCommand.SEND.equals(command) ||
StompCommand.SUBSCRIBE.equals(command) ||
StompCommand.UNSUBSCRIBE.equals(command) ||
StompCommand.DISCONNECT.equals(command)) {
logger.debug("Forwarding " + accessor.getShortLogMessage((byte[]) message.getPayload()));
}
else if (logger.isTraceEnabled()) {
logger.trace("Forwarding " + command + ": " + messageToSend);
logger.trace("Forwarding " + accessor.getDetailedLogMessage((byte[]) message.getPayload()));
}
ListenableFuture<Void> future = conn.send((Message<byte[]>) messageToSend);
......@@ -779,10 +782,12 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
@Override
public void onFailure(Throwable t) {
if (tcpConnection != null) {
handleTcpConnectionFailure("failed to forward " + messageToSend, t);
handleTcpConnectionFailure("failed to forward " +
accessor.getShortLogMessage((byte[]) message.getPayload()), t);
}
else if (logger.isErrorEnabled()) {
logger.error("Failed to forward " + messageToSend);
logger.error("Failed to forward " +
accessor.getShortLogMessage((byte[]) message.getPayload()));
}
}
});
......@@ -810,8 +815,7 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
*/
public void clearConnection() {
if (logger.isDebugEnabled()) {
logger.debug("Cleaning up connection state for session " + sessionId + " (" +
(getConnectionCount() - 1) + " remaining connections).");
logger.debug("Cleaning up connection state for session " + this.sessionId);
}
if (this.isRemoteClientSession) {
......@@ -823,8 +827,8 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
TcpConnection<byte[]> conn = this.tcpConnection;
this.tcpConnection = null;
if (conn != null) {
if (logger.isInfoEnabled()) {
logger.info("Closing TCP connection in session " + this.sessionId);
if (logger.isDebugEnabled()) {
logger.debug("Closing TCP connection in session " + this.sessionId);
}
conn.close();
}
......@@ -838,15 +842,15 @@ public class StompBrokerRelayMessageHandler extends AbstractBrokerMessageHandler
private class SystemStompConnectionHandler extends StompConnectionHandler {
public static final String SESSION_ID = "stompRelaySystemSessionId";
public SystemStompConnectionHandler(StompHeaderAccessor connectHeaders) {
super(SESSION_ID, connectHeaders, false);
super(SYSTEM_SESSION_ID, connectHeaders, false);
}
@Override
protected void afterStompConnected(StompHeaderAccessor connectedHeaders) {
if (logger.isInfoEnabled()) {
logger.info("\"System\" session connected.");
}
super.afterStompConnected(connectedHeaders);
publishBrokerAvailableEvent();
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -24,6 +24,8 @@ import java.util.Map;
import org.springframework.messaging.simp.SimpMessageType;
/**
* Represents a STOMP command.
*
* @author Rossen Stoyanchev
* @since 4.0
*/
......@@ -32,15 +34,15 @@ public enum StompCommand {
// client
CONNECT,
STOMP,
SEND,
DISCONNECT,
SUBSCRIBE,
UNSUBSCRIBE,
SEND,
ACK,
NACK,
BEGIN,
COMMIT,
ABORT,
DISCONNECT,
// server
CONNECTED,
......@@ -48,13 +50,8 @@ public enum StompCommand {
RECEIPT,
ERROR;
private static Map<StompCommand, SimpMessageType> messageTypes = new HashMap<StompCommand, SimpMessageType>();
private static Collection<StompCommand> destinationRequired = Arrays.asList(SEND, SUBSCRIBE, MESSAGE);
private static Collection<StompCommand> subscriptionIdRequired = Arrays.asList(SUBSCRIBE, UNSUBSCRIBE, MESSAGE);
private static Collection<StompCommand> contentLengthRequired = Arrays.asList(SEND, MESSAGE, ERROR);
private static Collection<StompCommand> bodyAllowed = Arrays.asList(SEND, MESSAGE, ERROR);
private static Map<StompCommand, SimpMessageType> messageTypes = new HashMap<StompCommand, SimpMessageType>();
static {
messageTypes.put(StompCommand.CONNECT, SimpMessageType.CONNECT);
messageTypes.put(StompCommand.STOMP, SimpMessageType.CONNECT);
......@@ -65,6 +62,13 @@ public enum StompCommand {
messageTypes.put(StompCommand.DISCONNECT, SimpMessageType.DISCONNECT);
}
private static Collection<StompCommand> destinationRequired = Arrays.asList(SEND, SUBSCRIBE, MESSAGE);
private static Collection<StompCommand> subscriptionIdRequired = Arrays.asList(SUBSCRIBE, UNSUBSCRIBE, MESSAGE);
private static Collection<StompCommand> contentLengthRequired = Arrays.asList(SEND, MESSAGE, ERROR);
private static Collection<StompCommand> bodyAllowed = Arrays.asList(SEND, MESSAGE, ERROR);
public SimpMessageType getMessageType() {
SimpMessageType type = messageTypes.get(this);
return (type != null) ? type : SimpMessageType.OTHER;
......
......@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderInitializer;
......@@ -51,7 +50,8 @@ public class StompDecoder {
static final byte[] HEARTBEAT_PAYLOAD = new byte[] {'\n'};
private final Log logger = LogFactory.getLog(StompDecoder.class);
private static final Log logger = LogFactory.getLog(StompDecoder.class);
private MessageHeaderInitializer headerInitializer;
......@@ -157,7 +157,7 @@ public class StompDecoder {
headerAccessor.setLeaveMutable(true);
decodedMessage = MessageBuilder.createMessage(payload, headerAccessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Decoded " + decodedMessage);
logger.trace("Decoded " + headerAccessor.getDetailedLogMessage(payload));
}
}
else {
......@@ -176,13 +176,13 @@ public class StompDecoder {
}
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Decoded heartbeat.");
}
StompHeaderAccessor headerAccessor = StompHeaderAccessor.createForHeartbeat();
initHeaders(headerAccessor);
headerAccessor.setLeaveMutable(true);
decodedMessage = MessageBuilder.createMessage(HEARTBEAT_PAYLOAD, headerAccessor.getMessageHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Decoded " + headerAccessor.getDetailedLogMessage(null));
}
}
return decodedMessage;
}
......
......@@ -26,7 +26,6 @@ import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.SimpMessageType;
......@@ -74,7 +73,6 @@ public final class StompEncoder {
DataOutputStream output = new DataOutputStream(baos);
if (SimpMessageType.HEARTBEAT.equals(SimpMessageHeaderAccessor.getMessageType(headers))) {
logger.trace("Encoded heartbeat");
output.write(StompDecoder.HEARTBEAT_PAYLOAD);
}
else {
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,6 +16,7 @@
package org.springframework.messaging.simp.stomp;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
......@@ -62,6 +63,7 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
private static final long[] DEFAULT_HEARTBEAT = new long[] {0, 0};
// STOMP header names
public static final String STOMP_ID_HEADER = "id";
......@@ -438,6 +440,71 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
setNativeHeader(STOMP_VERSION_HEADER, version);
}
// Logging related
@Override
public String getShortLogMessage(Object payload) {
if (StompCommand.SUBSCRIBE.equals(getCommand())) {
return "SUBSCRIBE " + getDestination() + " id=" + getSubscriptionId() + appendSession();
}
else if (StompCommand.UNSUBSCRIBE.equals(getCommand())) {
return "UNSUBSCRIBE id=" + getSubscriptionId() + appendSession();
}
else if (StompCommand.SEND.equals(getCommand())) {
return "SEND " + getDestination() + appendSession() + appendPayload(payload);
}
else if (StompCommand.CONNECT.equals(getCommand())) {
return "CONNECT" + (getUser() != null ? " user=" + getUser().getName() : "") + appendSession();
}
else if (StompCommand.CONNECTED.equals(getCommand())) {
return "CONNECTED heart-beat=" + Arrays.toString(getHeartbeat()) + appendSession();
}
else if (StompCommand.DISCONNECT.equals(getCommand())) {
return "DISCONNECT" + (getReceipt() != null ? " receipt=" + getReceipt() : "") + appendSession();
}
else {
return getDetailedLogMessage(payload);
}
}
@Override
public String getDetailedLogMessage(Object payload) {
if (isHeartbeat()) {
return "heart-beat in session " + getSessionId();
}
StompCommand command = getCommand();
if (command == null) {
return super.getDetailedLogMessage(payload);
}
StringBuilder sb = new StringBuilder();
sb.append(command.name()).append(" ").append(getNativeHeaders()).append(appendSession());
if (getUser() != null) {
sb.append(", user=").append(getUser().getName());
}
if (command.isBodyAllowed()) {
sb.append(appendPayload(payload));
}
return sb.toString();
}
private String appendSession() {
return " session=" + getSessionId();
}
private String appendPayload(Object payload) {
Assert.isInstanceOf(byte[].class, payload);
byte[] bytes = (byte[]) payload;
String contentType = (getContentType() != null ? " " + getContentType().toString() : "");
if (bytes.length == 0 || getContentType() == null || !isReadableContentType()) {
return contentType;
}
Charset charset = getContentType().getCharSet();
charset = (charset != null ? charset : StompDecoder.UTF8_CHARSET);
return (bytes.length < 80) ?
contentType + " payload=" + new String(bytes, charset) :
contentType + " payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
}
private static class StompPasscode {
......@@ -452,4 +519,5 @@ public class StompHeaderAccessor extends SimpMessageHeaderAccessor {
return "[PROTECTED]";
}
}
}
......@@ -181,6 +181,10 @@ public class DefaultUserDestinationResolver implements UserDestinationResolver {
return sourceDestinationWithoutPrefix + "-user" + sessionId;
}
@Override
public String toString() {
return "DefaultUserDestinationResolver[prefix=" + this.destinationPrefix + "]";
}
private static class DestinationInfo {
......
......@@ -172,7 +172,7 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
Set<String> destinations = result.getTargetDestinations();
if (destinations.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("No user destinations for " + message);
logger.trace("No user destinations found for " + result.getSourceDestination());
}
return;
}
......@@ -183,10 +183,10 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
headerAccessor.setNativeHeader(header, result.getSubscribeDestination());
message = MessageBuilder.createMessage(message.getPayload(), headerAccessor.getMessageHeaders());
}
if (logger.isDebugEnabled()) {
logger.debug("Translated " + result.getSourceDestination() + " -> " + destinations);
}
for (String destination : destinations) {
if (logger.isTraceEnabled()) {
logger.trace("Sending " + message);
}
this.brokerMessagingTemplate.send(destination, message);
}
}
......@@ -197,4 +197,9 @@ public class UserDestinationMessageHandler implements MessageHandler, SmartLifec
}
}
@Override
public String toString() {
return "UserDestinationMessageHandler[" + this.userDestinationResolver + "]";
}
}
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -101,9 +101,6 @@ public abstract class AbstractMessageChannel implements MessageChannel, BeanName
@Override
public final boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
if (logger.isTraceEnabled()) {
logger.trace(this + " sending " + message);
}
message = this.interceptorChain.preSend(message, this);
if (message == null) {
return false;
......
......@@ -47,7 +47,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
boolean result = this.handlers.add(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(this + " subscribed " + handler);
logger.debug(getBeanName() + " added " + handler);
}
}
return result;
......@@ -58,7 +58,7 @@ public abstract class AbstractSubscribableChannel extends AbstractMessageChannel
boolean result = this.handlers.remove(handler);
if (result) {
if (logger.isDebugEnabled()) {
logger.debug(this + " unsubscribed " + handler);
logger.debug(getBeanName() + " removed " + handler);
}
}
return result;
......
......@@ -56,37 +56,23 @@ class ChannelInterceptorChain {
public Message<?> preSend(Message<?> message, MessageChannel channel) {
Message<?> originalMessage = message;
for (ChannelInterceptor interceptor : this.interceptors) {
message = interceptor.preSend(message, channel);
if (message == null) {
if (logger.isTraceEnabled()) {
logger.trace("preSend returned null (precluding the send)");
}
logger.debug("preSend returned null precluding send");
return null;
}
}
if (logger.isDebugEnabled()) {
if (message != originalMessage) {
logger.debug("preSend returned modified message, new message=" + message);
}
}
return message;
}
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
if (logger.isTraceEnabled()) {
logger.trace("postSend (sent=" + sent + ")");
}
for (ChannelInterceptor interceptor : this.interceptors) {
interceptor.postSend(message, channel, sent);
}
}
public boolean preReceive(MessageChannel channel) {
if (logger.isTraceEnabled()) {
logger.trace("preReceive on channel '" + channel + "'");
}
for (ChannelInterceptor interceptor : this.interceptors) {
if (!interceptor.preReceive(channel)) {
return false;
......@@ -96,12 +82,6 @@ class ChannelInterceptorChain {
}
public Message<?> postReceive(Message<?> message, MessageChannel channel) {
if (message != null && logger.isTraceEnabled()) {
logger.trace("postReceive on channel '" + channel + "', message: " + message);
}
else if (logger.isTraceEnabled()) {
logger.trace("postReceive on channel '" + channel + "', message is null");
}
for (ChannelInterceptor interceptor : this.interceptors) {
message = interceptor.postReceive(message, channel);
if (message == null) {
......@@ -111,5 +91,4 @@ class ChannelInterceptorChain {
return message;
}
}
......@@ -94,8 +94,7 @@ public class GenericMessage<T> implements Message<T>, Serializable {
sb.append("[Payload byte[").append(((byte[]) this.payload).length).append("]]");
}
else {
sb.append("[Payload ").append(this.payload.getClass().getSimpleName());
sb.append(" content=").append(this.payload).append("]");
sb.append("[Payload=").append(this.payload).append("]");
}
sb.append("[Headers=").append(this.headers).append("]");
return sb.toString();
......
......@@ -16,7 +16,9 @@
package org.springframework.messaging.support;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -28,9 +30,11 @@ import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.stomp.StompDecoder;
import org.springframework.util.Assert;
import org.springframework.util.IdGenerator;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
......@@ -111,6 +115,13 @@ import org.springframework.util.StringUtils;
*/
public class MessageHeaderAccessor {
private static final MimeType[] readableMimeTypes = new MimeType[] {
MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_XML,
new MimeType("text", "*"), new MimeType("application", "*+json"), new MimeType("application", "*+xml")
};
public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
protected Log logger = LogFactory.getLog(getClass());
private final MutableMessageHeaders headers;
......@@ -469,6 +480,81 @@ public class MessageHeaderAccessor {
}
/**
* Return a concise message for logging purposes.
* @param payload the payload that corresponds to the headers.
* @return the message
*/
public String getShortLogMessage(Object payload) {
return "headers=" + this.headers.toString() + getShortPayloadLogMessage(payload);
}
/**
* Return a more detailed message for logging purposes.
* @param payload the payload that corresponds to the headers.
* @return the message
*/
public String getDetailedLogMessage(Object payload) {
return "headers=" + this.headers.toString() + getDetailedPayloadLogMessage(payload);
}
protected String getShortPayloadLogMessage(Object payload) {
if (payload instanceof String) {
String payloadText = (String) payload;
return (payloadText.length() < 80) ?
" payload=" + payloadText :
" payload=" + payloadText.substring(0, 80) + "...(truncated)";
}
else if (payload instanceof byte[]) {
byte[] bytes = (byte[]) payload;
if (isReadableContentType()) {
Charset charset = getContentType().getCharSet();
charset = (charset != null ? charset : DEFAULT_CHARSET);
return (bytes.length < 80) ?
" payload=" + new String(bytes, charset) :
" payload=" + new String(Arrays.copyOf(bytes, 80), charset) + "...(truncated)";
}
else {
return " payload=byte[" + bytes.length + "]";
}
}
else {
String payloadText = payload.toString();
return (payloadText.length() < 80) ?
" payload=" + payloadText :
" payload=" + ObjectUtils.identityToString(payload);
}
}
protected String getDetailedPayloadLogMessage(Object payload) {
if (payload instanceof String) {
return " payload=" + ((String) payload);
}
else if (payload instanceof byte[]) {
byte[] bytes = (byte[]) payload;
if (isReadableContentType()) {
Charset charset = getContentType().getCharSet();
charset = (charset != null ? charset : DEFAULT_CHARSET);
return " payload=" + new String(bytes, charset);
}
else {
return " payload=byte[" + bytes.length + "]";
}
}
else {
return " payload=" + payload;
}
}
protected boolean isReadableContentType() {
for (MimeType mimeType : readableMimeTypes) {
if (mimeType.includes(getContentType())) {
return true;
}
}
return false;
}
@Override
public String toString() {
return getClass().getSimpleName() + "[headers=" + this.headers + "]";
......
......@@ -110,12 +110,22 @@ public class SimpAttributesContextHolderTests {
}
@Test
public void setAttributesFromMessageWithMissingHeaders() {
public void setAttributesFromMessageWithMissingSessionId() {
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage(startsWith("Message does not contain SiMP session id or attributes"));
this.thrown.expectMessage(startsWith("No session id in"));
SimpAttributesContextHolder.setAttributesFromMessage(new GenericMessage<Object>(""));
}
@Test
public void setAttributesFromMessageWithMissingSessionAttributes() {
this.thrown.expect(IllegalStateException.class);
this.thrown.expectMessage(startsWith("No session attributes in"));
SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create();
headerAccessor.setSessionId("session1");
Message<?> message = MessageBuilder.createMessage("", headerAccessor.getMessageHeaders());
SimpAttributesContextHolder.setAttributesFromMessage(message);
}
@Test
public void currentAttributes() {
SimpAttributesContextHolder.setAttributes(this.simpAttributes);
......
/*
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.simp;
import org.junit.Test;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
/**
* Unit tests for SimpMessageHeaderAccessor.
*
* @author Rossen Stoyanchev
*/
public class SimpMessageHeaderAccessorTests {
@Test
public void getShortLogMessage() {
assertEquals("MESSAGE session=null payload=p", SimpMessageHeaderAccessor.create().getShortLogMessage("p"));
}
@Test
public void getLogMessageWithValuesSet() {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
accessor.setDestination("/destination");
accessor.setSubscriptionId("subscription");
accessor.setSessionId("session");
accessor.setUser(new TestPrincipal("user"));
accessor.setSessionAttributes(Collections.<String, Object>singletonMap("key", "value"));
assertEquals("MESSAGE destination=/destination subscriptionId=subscription " +
"session=session user=user attributes[1] payload=p", accessor.getShortLogMessage("p"));
}
@Test
public void getDetailedLogMessageWithValuesSet() {
SimpMessageHeaderAccessor accessor = SimpMessageHeaderAccessor.create();
accessor.setDestination("/destination");
accessor.setSubscriptionId("subscription");
accessor.setSessionId("session");
accessor.setUser(new TestPrincipal("user"));
accessor.setSessionAttributes(Collections.<String, Object>singletonMap("key", "value"));
accessor.setNativeHeader("nativeKey", "nativeValue");
assertEquals("MESSAGE destination=/destination subscriptionId=subscription " +
"session=session user=user attributes={key=value} nativeHeaders=" +
"{nativeKey=[nativeValue]} payload=p", accessor.getDetailedLogMessage("p"));
}
}
......@@ -109,7 +109,7 @@ public class SimpleBrokerMessageHandlerTests {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.DISCONNECT);
headers.setSessionId(sess1);
Message<byte[]> message = MessageBuilder.withPayload(new byte[0]).copyHeaders(headers.toMap()).build();
Message<byte[]> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
this.messageHandler.handleMessage(message);
this.messageHandler.handleMessage(createMessage("/foo", "message1"));
......@@ -141,28 +141,23 @@ public class SimpleBrokerMessageHandlerTests {
protected Message<String> createSubscriptionMessage(String sessionId, String subcriptionId, String destination) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.SUBSCRIBE);
headers.setSubscriptionId(subcriptionId);
headers.setDestination(destination);
headers.setSessionId(sessionId);
return MessageBuilder.withPayload("").copyHeaders(headers.toMap()).build();
return MessageBuilder.createMessage("", headers.getMessageHeaders());
}
protected Message<String> createConnectMessage(String sessionId) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.CONNECT);
headers.setSessionId(sessionId);
return MessageBuilder.withPayload("").setHeaders(headers).build();
return MessageBuilder.createMessage("", headers.getMessageHeaders());
}
protected Message<String> createMessage(String destination, String payload) {
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
headers.setDestination(destination);
return MessageBuilder.withPayload(payload).copyHeaders(headers.toMap()).build();
return MessageBuilder.createMessage("", headers.getMessageHeaders());
}
protected boolean assertCapturedMessage(String sessionId, String subcriptionId, String destination) {
......
......@@ -166,7 +166,7 @@ public class MessageBrokerConfigurationTests {
headers.setSessionId("sess1");
headers.setSubscriptionId("subs1");
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
// subscribe
broker.handleMessage(message);
......@@ -174,7 +174,7 @@ public class MessageBrokerConfigurationTests {
headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setSessionId("sess1");
headers.setDestination("/foo");
message = MessageBuilder.withPayload("bar".getBytes()).setHeaders(headers).build();
message = MessageBuilder.createMessage("bar".getBytes(), headers.getMessageHeaders());
// message
broker.handleMessage(message);
......@@ -235,7 +235,7 @@ public class MessageBrokerConfigurationTests {
headers.setSessionId("sess1");
headers.setSessionAttributes(new ConcurrentHashMap<>());
headers.setDestination("/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
messageHandler.handleMessage(message);
......@@ -256,7 +256,7 @@ public class MessageBrokerConfigurationTests {
StompHeaderAccessor headers = StompHeaderAccessor.create(StompCommand.SEND);
headers.setDestination("/user/joe/foo");
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
Message<?> message = MessageBuilder.createMessage(new byte[0], headers.getMessageHeaders());
messageHandler.handleMessage(message);
......
......@@ -17,6 +17,7 @@
package org.springframework.messaging.simp.stomp;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
......@@ -31,6 +32,7 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.util.AlternativeJdkIdGenerator;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.MultiValueMap;
......@@ -44,6 +46,7 @@ import static org.junit.Assert.*;
*/
public class StompHeaderAccessorTests {
private static final Charset UTF_8 = Charset.forName("UTF-8");
@Test
public void createWithCommand() {
......@@ -240,4 +243,22 @@ public class StompHeaderAccessorTests {
assertSame(headerAccessor, MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class));
}
@Test
public void getShortLogMessage() {
StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.SEND);
accessor.setDestination("/foo");
accessor.setContentType(MimeTypeUtils.APPLICATION_JSON);
accessor.setSessionId("123");
String actual = accessor.getShortLogMessage("payload".getBytes(Charset.forName("UTF-8")));
assertEquals("SEND /foo session=123 application/json payload=payload", actual);
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 80; i++) {
sb.append("a");
}
final String payload = sb.toString() + " > 80";
actual = accessor.getShortLogMessage(payload.getBytes(UTF_8));
assertEquals("SEND /foo session=123 application/json payload=" + sb + "...(truncated)", actual);
}
}
......@@ -16,19 +16,23 @@
package org.springframework.messaging.support;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import org.hamcrest.CoreMatchers;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.IdGenerator;
import org.springframework.util.MimeTypeUtils;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.junit.Assert.*;
/**
......@@ -38,6 +42,8 @@ import static org.junit.Assert.*;
*/
public class MessageHeaderAccessorTests {
private static final Charset UTF_8 = Charset.forName("UTF-8");
@Rule
public final ExpectedException thrown = ExpectedException.none();
......@@ -250,6 +256,77 @@ public class MessageHeaderAccessorTests {
assertNotNull(headers.getTimestamp());
}
@Test
public void getShortLogMessagePayload() {
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
accessor.setContentType(MimeTypeUtils.TEXT_PLAIN);
assertEquals("headers={contentType=text/plain} payload=p", accessor.getShortLogMessage("p"));
assertEquals("headers={contentType=text/plain} payload=p", accessor.getShortLogMessage("p".getBytes(UTF_8)));
assertEquals("headers={contentType=text/plain} payload=p", accessor.getShortLogMessage(new Object() {
@Override
public String toString() {
return "p";
}
}));
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 80; i++) {
sb.append("a");
}
final String payload = sb.toString() + " > 80";
String actual = accessor.getShortLogMessage(payload);
assertEquals("headers={contentType=text/plain} payload=" + sb + "...(truncated)", actual);
actual = accessor.getShortLogMessage(payload.getBytes(UTF_8));
assertEquals("headers={contentType=text/plain} payload=" + sb + "...(truncated)", actual);
actual = accessor.getShortLogMessage(new Object() {
@Override
public String toString() {
return payload;
}
});
assertThat(actual, startsWith("headers={contentType=text/plain} payload=" + getClass().getName() + "$"));
}
@Test
public void getDetailedLogMessagePayload() {
MessageHeaderAccessor accessor = new MessageHeaderAccessor();
accessor.setContentType(MimeTypeUtils.TEXT_PLAIN);
assertEquals("headers={contentType=text/plain} payload=p", accessor.getDetailedLogMessage("p"));
assertEquals("headers={contentType=text/plain} payload=p", accessor.getDetailedLogMessage("p".getBytes(UTF_8)));
assertEquals("headers={contentType=text/plain} payload=p", accessor.getDetailedLogMessage(new Object() {
@Override
public String toString() {
return "p";
}
}));
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 80; i++) {
sb.append("a");
}
final String payload = sb.toString() + " > 80";
String actual = accessor.getDetailedLogMessage(payload);
assertEquals("headers={contentType=text/plain} payload=" + sb + " > 80", actual);
actual = accessor.getDetailedLogMessage(payload.getBytes(UTF_8));
assertEquals("headers={contentType=text/plain} payload=" + sb + " > 80", actual);
actual = accessor.getDetailedLogMessage(new Object() {
@Override
public String toString() {
return payload;
}
});
assertEquals("headers={contentType=text/plain} payload=" + sb + " > 80", actual);
}
public static class TestMessageHeaderAccessor extends MessageHeaderAccessor {
......
......@@ -40,7 +40,8 @@ import org.springframework.web.socket.WebSocketSession;
*/
public abstract class AbstractWebSocketSession<T> implements NativeWebSocketSession {
protected final Log logger = LogFactory.getLog(getClass());
protected static final Log logger = LogFactory.getLog(NativeWebSocketSession.class);
private T nativeSession;
......@@ -133,8 +134,8 @@ public abstract class AbstractWebSocketSession<T> implements NativeWebSocketSess
@Override
public final void close(CloseStatus status) throws IOException {
checkNativeSessionInitialized();
if (logger.isInfoEnabled()) {
logger.info("Closing " + this);
if (logger.isDebugEnabled()) {
logger.debug("Closing " + this);
}
closeInternal(status);
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
......@@ -122,8 +122,8 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
protected void startInternal() {
synchronized (lifecycleMonitor) {
if (logger.isDebugEnabled()) {
logger.debug("Starting " + this.getClass().getSimpleName());
if (logger.isInfoEnabled()) {
logger.info("Starting " + this.getClass().getSimpleName());
}
this.isRunning = true;
openConnection();
......@@ -136,8 +136,8 @@ public abstract class ConnectionManagerSupport implements SmartLifecycle {
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping " + this.getClass().getSimpleName());
if (logger.isInfoEnabled()) {
logger.info("Stopping " + this.getClass().getSimpleName());
}
try {
stopInternal();
......
......@@ -128,8 +128,8 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting Jetty WebSocketClient");
if (logger.isInfoEnabled()) {
logger.info("Starting Jetty WebSocketClient");
}
this.client.start();
}
......@@ -145,8 +145,8 @@ public class JettyWebSocketClient extends AbstractWebSocketClient implements Sma
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
try {
if (logger.isDebugEnabled()) {
logger.debug("Stopping Jetty WebSocketClient");
if (logger.isInfoEnabled()) {
logger.info("Stopping Jetty WebSocketClient");
}
this.client.stop();
}
......
......@@ -176,14 +176,14 @@ public class StandardWebSocketClient extends AbstractWebSocketClient {
@Override
public void beforeRequest(Map<String, List<String>> requestHeaders) {
requestHeaders.putAll(this.headers);
if (logger.isDebugEnabled()) {
logger.debug("Handshake request headers: " + requestHeaders);
if (logger.isTraceEnabled()) {
logger.trace("Handshake request headers: " + requestHeaders);
}
}
@Override
public void afterResponse(HandshakeResponse response) {
if (logger.isDebugEnabled()) {
logger.debug("Handshake response headers: " + response.getHeaders());
if (logger.isTraceEnabled()) {
logger.trace("Handshake response headers: " + response.getHeaders());
}
}
}
......
......@@ -67,7 +67,7 @@ public class WebSocketMessageBrokerStats {
private ScheduledFuture<?> loggingTask;
private long loggingPeriod = 15 * 60 * 1000;
private long loggingPeriod = 30 * 60 * 1000;
public void setSubProtocolWebSocketHandler(SubProtocolWebSocketHandler webSocketHandler) {
......@@ -102,7 +102,7 @@ public class WebSocketMessageBrokerStats {
public void setSockJsTaskScheduler(ThreadPoolTaskScheduler sockJsTaskScheduler) {
this.sockJsTaskScheduler = sockJsTaskScheduler.getScheduledThreadPoolExecutor();
this.loggingTask = initLoggingTask(3 * 60 * 1000);
this.loggingTask = initLoggingTask(1 * 60 * 1000);
}
private ScheduledFuture<?> initLoggingTask(long initialDelay) {
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -60,9 +60,6 @@ public class BeanCreatingHandlerProvider<T> implements BeanFactoryAware {
}
public T getHandler() {
if (logger.isTraceEnabled()) {
logger.trace("Creating instance for handler type " + this.handlerType);
}
if (this.beanFactory == null) {
logger.warn("No BeanFactory available, attempting to use default constructor");
return BeanUtils.instantiate(this.handlerType);
......@@ -74,16 +71,13 @@ public class BeanCreatingHandlerProvider<T> implements BeanFactoryAware {
public void destroy(T handler) {
if (this.beanFactory != null) {
if (logger.isTraceEnabled()) {
logger.trace("Destroying handler instance " + handler);
}
this.beanFactory.destroyBean(handler);
}
}
@Override
public String toString() {
return "BeanCreatingHandlerProvider [handlerClass=" + this.handlerType + "]";
return "BeanCreatingHandlerProvider[handlerType=" + this.handlerType + "]";
}
}
......@@ -45,7 +45,7 @@ import org.springframework.web.socket.WebSocketSession;
*/
public class ConcurrentWebSocketSessionDecorator extends WebSocketSessionDecorator {
private static final Log logger = LogFactory.getLog(ConcurrentWebSocketSessionDecorator.class);
private static final Log logger = LogFactory.getLog("_" + ConcurrentWebSocketSessionDecorator.class.getName());
private final Queue<WebSocketMessage<?>> buffer = new LinkedBlockingQueue<WebSocketMessage<?>>();
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -31,7 +31,7 @@ import org.springframework.web.socket.WebSocketSession;
*/
public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator {
private final Log logger = LogFactory.getLog(LoggingWebSocketHandlerDecorator.class);
private static final Log logger = LogFactory.getLog(LoggingWebSocketHandlerDecorator.class);
public LoggingWebSocketHandlerDecorator(WebSocketHandler delegate) {
......@@ -41,8 +41,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
if (logger.isInfoEnabled()) {
logger.info("Connection established " + session);
if (logger.isDebugEnabled()) {
logger.debug("New " + session);
}
super.afterConnectionEstablished(session);
}
......@@ -65,8 +65,8 @@ public class LoggingWebSocketHandlerDecorator extends WebSocketHandlerDecorator
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (logger.isInfoEnabled()) {
logger.info("Connection closed with " + closeStatus + " in " + session + ", ");
if (logger.isDebugEnabled()) {
logger.debug(session + " closed with " + closeStatus);
}
super.afterConnectionClosed(session, closeStatus);
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -50,6 +50,7 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
private static final Log logger = LogFactory.getLog(PerConnectionWebSocketHandler.class);
private final BeanCreatingHandlerProvider<WebSocketHandler> provider;
private final Map<WebSocketSession, WebSocketHandler> handlers =
......@@ -113,7 +114,7 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
}
}
catch (Throwable t) {
logger.warn("Error while destroying handler", t);
logger.warn("Error while destroying " + handler, t);
}
}
......@@ -124,7 +125,7 @@ public class PerConnectionWebSocketHandler implements WebSocketHandler, BeanFact
@Override
public String toString() {
return "PerConnectionWebSocketHandlerProxy [handlerType=" + this.provider.getHandlerType() + "]";
return "PerConnectionWebSocketHandlerProxy[handlerType=" + this.provider.getHandlerType() + "]";
}
}
......@@ -198,8 +198,9 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
messages = decoder.decode(byteBuffer);
if (messages.isEmpty()) {
if (logger.isTraceEnabled()) {
logger.trace("Incomplete STOMP frame content received, bufferSize=" +
decoder.getBufferSize() + ", bufferSizeLimit=" + decoder.getBufferSizeLimit() + ".");
logger.trace("Incomplete STOMP frame content received in session " +
session + ", bufferSize=" + decoder.getBufferSize() +
", bufferSizeLimit=" + decoder.getBufferSizeLimit() + ".");
}
return;
}
......@@ -219,9 +220,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
if (logger.isTraceEnabled()) {
logger.trace(headerAccessor.isHeartbeat() ?
"Received heartbeat from broker in session " + session.getId() + "." :
"Received message from broker in session " + session.getId() + ": " + message + ".");
logger.trace("From client: " + headerAccessor.getShortLogMessage(message.getPayload()));
}
headerAccessor.setSessionId(session.getId());
......@@ -264,9 +263,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
private void publishEvent(ApplicationEvent event) {
try {
if (logger.isInfoEnabled()) {
logger.info("Publishing " + event);
}
this.eventPublisher.publishEvent(event);
}
catch (Throwable ex) {
......@@ -300,9 +296,7 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
StompCommand command = stompAccessor.getCommand();
if (StompCommand.MESSAGE.equals(command)) {
if (stompAccessor.getSubscriptionId() == null) {
if (logger.isWarnEnabled()) {
logger.warn("No STOMP \"subscription\" header in " + message);
}
logger.warn("No STOMP \"subscription\" header in " + message);
}
String origDestination = stompAccessor.getFirstNativeHeader(SimpMessageHeaderAccessor.ORIGINAL_DESTINATION);
if (origDestination != null) {
......@@ -418,10 +412,6 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
if (heartbeat[1] > 0) {
session = WebSocketSessionDecorator.unwrap(session);
if (session instanceof SockJsSession) {
if (logger.isDebugEnabled()) {
logger.debug("STOMP heartbeats enabled. " +
"Turning off SockJS heartbeats in " + session.getId() + ".");
}
((SockJsSession) session).disableHeartbeat();
}
}
......@@ -482,6 +472,10 @@ public class StompSubProtocolHandler implements SubProtocolHandler, ApplicationE
return MessageBuilder.createMessage(EMPTY_PAYLOAD, headerAccessor.getMessageHeaders());
}
@Override
public String toString() {
return "StompSubProtocolHandler" + getSupportedProtocols();
}
private class Stats {
......
......@@ -265,9 +265,6 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
this.stats.incrementSessionCount(session);
session = new ConcurrentWebSocketSessionDecorator(session, getSendTimeLimit(), getSendBufferSizeLimit());
this.sessions.put(session.getId(), new WebSocketSessionHolder(session));
if (logger.isDebugEnabled()) {
logger.debug("Started session " + session.getId() + " (" + this.sessions.size() + " sessions)");
}
findProtocolHandler(session).afterSessionStarted(session, this.clientInboundChannel);
}
......@@ -422,7 +419,7 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
private void clearSession(WebSocketSession session, CloseStatus closeStatus) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Clearing session " + session.getId() + " (" + this.sessions.size() + " remain)");
logger.debug("Clearing session " + session.getId());
}
if (this.sessions.remove(session.getId()) != null) {
this.stats.decrementSessionCount(session);
......@@ -435,6 +432,10 @@ public class SubProtocolWebSocketHandler implements WebSocketHandler,
return false;
}
@Override
public String toString() {
return "SubProtocolWebSocketHandler" + getProtocolHandlers();
}
private static class WebSocketSessionHolder {
......
......@@ -156,24 +156,23 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
WebSocketHandler wsHandler, Map<String, Object> attributes) throws HandshakeFailureException {
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Initiating handshake for " + request.getURI() + ", headers=" + headers);
if (logger.isTraceEnabled()) {
logger.trace("Processing request " + request.getURI() + " with headers=" + headers);
}
try {
if (!HttpMethod.GET.equals(request.getMethod())) {
response.setStatusCode(HttpStatus.METHOD_NOT_ALLOWED);
response.getHeaders().setAllow(Collections.singleton(HttpMethod.GET));
logger.debug("Only HTTP GET is allowed, current method is " + request.getMethod());
if (logger.isErrorEnabled()) {
logger.error("Handshake failed due to unexpected HTTP method: " + request.getMethod());
}
return false;
}
if (!"WebSocket".equalsIgnoreCase(headers.getUpgrade())) {
handleInvalidUpgradeHeader(request, response);
return false;
}
if (!headers.getConnection().contains("Upgrade") &&
!headers.getConnection().contains("upgrade")) {
if (!headers.getConnection().contains("Upgrade") && !headers.getConnection().contains("upgrade")) {
handleInvalidConnectHeader(request, response);
return false;
}
......@@ -187,7 +186,9 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
}
String wsKey = headers.getSecWebSocketKey();
if (wsKey == null) {
logger.debug("Missing \"Sec-WebSocket-Key\" header");
if (logger.isErrorEnabled()) {
logger.error("Missing \"Sec-WebSocket-Key\" header");
}
response.setStatusCode(HttpStatus.BAD_REQUEST);
return false;
}
......@@ -198,33 +199,30 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
}
String subProtocol = selectProtocol(headers.getSecWebSocketProtocol(), wsHandler);
if (logger.isDebugEnabled()) {
logger.debug("Selected sub-protocol: '" + subProtocol + "'");
}
List<WebSocketExtension> requested = headers.getSecWebSocketExtensions();
List<WebSocketExtension> supported = this.requestUpgradeStrategy.getSupportedExtensions(request);
List<WebSocketExtension> extensions = filterRequestedExtensions(request, requested, supported);
Principal user = determineUser(request, wsHandler, attributes);
if (logger.isDebugEnabled()) {
logger.debug("Upgrading request, sub-protocol=" + subProtocol + ", extensions=" + extensions);
if (logger.isTraceEnabled()) {
logger.trace("Upgrading to WebSocket");
}
this.requestUpgradeStrategy.upgrade(request, response, subProtocol, extensions, user, wsHandler, attributes);
return true;
}
protected void handleInvalidUpgradeHeader(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
logger.debug("Invalid Upgrade header " + request.getHeaders().getUpgrade());
if (logger.isErrorEnabled()) {
logger.error("Handshake failed due to invalid Upgrade header: " + request.getHeaders().getUpgrade());
}
response.setStatusCode(HttpStatus.BAD_REQUEST);
response.getBody().write("Can \"Upgrade\" only to \"WebSocket\".".getBytes("UTF-8"));
}
protected void handleInvalidConnectHeader(ServerHttpRequest request, ServerHttpResponse response) throws IOException {
logger.debug("Invalid Connection header " + request.getHeaders().getConnection());
if (logger.isErrorEnabled()) {
logger.error("Handshake failed due to invalid Connection header " + request.getHeaders().getConnection());
}
response.setStatusCode(HttpStatus.BAD_REQUEST);
response.getBody().write("\"Connection\" must be \"upgrade\".".getBytes("UTF-8"));
}
......@@ -232,17 +230,11 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
protected boolean isWebSocketVersionSupported(WebSocketHttpHeaders httpHeaders) {
String version = httpHeaders.getSecWebSocketVersion();
String[] supportedVersions = getSupportedVersions();
if (logger.isDebugEnabled()) {
logger.debug("Requested version=" + version + ", supported=" + Arrays.toString(supportedVersions));
}
for (String supportedVersion : supportedVersions) {
if (supportedVersion.trim().equals(version)) {
return true;
}
}
if (logger.isDebugEnabled()) {
logger.debug("Version " + version + " is not a supported WebSocket version");
}
return false;
}
......@@ -251,12 +243,14 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
}
protected void handleWebSocketVersionNotSupported(ServerHttpRequest request, ServerHttpResponse response) {
if (logger.isDebugEnabled()) {
logger.debug("WebSocket version not supported: " + request.getHeaders().get("Sec-WebSocket-Version"));
if (logger.isErrorEnabled()) {
String version = request.getHeaders().getFirst("Sec-WebSocket-Version");
logger.error("Handshake failed due to unsupported WebSocket version: " + version +
". Supported versions: " + Arrays.toString(getSupportedVersions()));
}
response.setStatusCode(HttpStatus.UPGRADE_REQUIRED);
response.getHeaders().put(WebSocketHttpHeaders.SEC_WEBSOCKET_VERSION, Arrays.asList(
StringUtils.arrayToCommaDelimitedString(getSupportedVersions())));
response.getHeaders().put(WebSocketHttpHeaders.SEC_WEBSOCKET_VERSION,
Arrays.asList(StringUtils.arrayToCommaDelimitedString(getSupportedVersions())));
}
protected boolean isValidOrigin(ServerHttpRequest request) {
......@@ -277,11 +271,6 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
protected String selectProtocol(List<String> requestedProtocols, WebSocketHandler webSocketHandler) {
if (requestedProtocols != null) {
List<String> handlerProtocols = determineHandlerSupportedProtocols(webSocketHandler);
if (logger.isDebugEnabled()) {
logger.debug("Requested sub-protocol(s): " + requestedProtocols +
", WebSocketHandler supported sub-protocol(s): " + handlerProtocols +
", configured sub-protocol(s): " + this.supportedProtocols);
}
for (String protocol : requestedProtocols) {
if (handlerProtocols.contains(protocol.toLowerCase())) {
return protocol;
......@@ -322,11 +311,6 @@ public class DefaultHandshakeHandler implements HandshakeHandler {
protected List<WebSocketExtension> filterRequestedExtensions(ServerHttpRequest request,
List<WebSocketExtension> requested, List<WebSocketExtension> supported) {
if (requested != null) {
if (logger.isDebugEnabled()) {
logger.debug("Requested extension(s): " + requested + ", supported extension(s): " + supported);
}
}
return requested;
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -57,6 +57,9 @@ public class HandshakeInterceptorChain {
for (int i = 0; i < this.interceptors.size(); i++) {
HandshakeInterceptor interceptor = this.interceptors.get(i);
if (!interceptor.beforeHandshake(request, response, this.wsHandler, attributes)) {
if (logger.isDebugEnabled()) {
logger.debug(interceptor + " return false precluding handshake.");
}
applyAfterHandshake(request, response, null);
return false;
}
......
......@@ -21,9 +21,6 @@ import java.util.Enumeration;
import java.util.Map;
import javax.servlet.http.HttpSession;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
......@@ -41,8 +38,6 @@ import org.springframework.web.socket.server.HandshakeInterceptor;
*/
public class HttpSessionHandshakeInterceptor implements HandshakeInterceptor {
private static final Log logger = LogFactory.getLog(HttpSessionHandshakeInterceptor.class);
private Collection<String> attributeNames;
......@@ -74,16 +69,8 @@ public class HttpSessionHandshakeInterceptor implements HandshakeInterceptor {
while (names.hasMoreElements()) {
String name = names.nextElement();
if (CollectionUtils.isEmpty(this.attributeNames) || this.attributeNames.contains(name)) {
if (logger.isTraceEnabled()) {
logger.trace("Adding HTTP session attribute to handshake attributes: " + name);
}
attributes.put(name, session.getAttribute(name));
}
else {
if (logger.isTraceEnabled()) {
logger.trace("Skipped HTTP session attribute");
}
}
}
}
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -25,6 +25,8 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpRequest;
......@@ -52,6 +54,9 @@ import org.springframework.web.socket.handler.LoggingWebSocketHandlerDecorator;
*/
public class WebSocketHttpRequestHandler implements HttpRequestHandler {
private static final Log logger = LogFactory.getLog(WebSocketHttpRequestHandler.class);
private final HandshakeHandler handshakeHandler;
private final WebSocketHandler wsHandler;
......@@ -113,6 +118,9 @@ public class WebSocketHttpRequestHandler implements HttpRequestHandler {
HandshakeFailureException failure = null;
try {
if (logger.isDebugEnabled()) {
logger.debug(servletRequest.getMethod() + " " + servletRequest.getRequestURI());
}
Map<String, Object> attributes = new HashMap<String, Object>();
if (!chain.applyBeforeHandshake(request, response, attributes)) {
return;
......
......@@ -157,8 +157,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
@Override
public final void close(CloseStatus status) {
Assert.isTrue(status != null && isUserSetStatus(status), "Invalid close status: " + status);
if (logger.isInfoEnabled()) {
logger.info("Closing session with " + status + " in " + this);
if (logger.isDebugEnabled()) {
logger.debug("Closing session with " + status + " in " + this);
}
closeInternal(status);
}
......@@ -213,8 +213,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
}
private void handleOpenFrame() {
if (logger.isInfoEnabled()) {
logger.info("Processing SockJS open frame in " + this);
if (logger.isDebugEnabled()) {
logger.debug("Processing SockJS open frame in " + this);
}
if (State.NEW.equals(state)) {
this.state = State.OPEN;
......@@ -280,8 +280,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
if (data.length == 2) {
closeStatus = new CloseStatus(Integer.valueOf(data[0]), data[1]);
}
if (logger.isInfoEnabled()) {
logger.info("Processing SockJS close frame with " + closeStatus + " in " + this);
if (logger.isDebugEnabled()) {
logger.debug("Processing SockJS close frame with " + closeStatus + " in " + this);
}
}
catch (IOException ex) {
......@@ -311,8 +311,8 @@ public abstract class AbstractClientSockJsSession implements WebSocketSession {
this.closeStatus = (this.closeStatus != null ? this.closeStatus : closeStatus);
Assert.state(this.closeStatus != null, "CloseStatus not available");
if (logger.isInfoEnabled()) {
logger.info("Transport closed with " + this.closeStatus + " in " + this);
if (logger.isDebugEnabled()) {
logger.debug("Transport closed with " + this.closeStatus + " in " + this);
}
this.state = State.CLOSED;
......
......@@ -104,8 +104,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isDebugEnabled()) {
logger.debug("SockJS Info request (url=" + infoUrl + ") response: " + response);
if (logger.isTraceEnabled()) {
logger.trace("SockJS Info request (url=" + infoUrl + ") response: " + response);
}
return response.getBody();
}
......@@ -114,8 +114,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
@Override
public void executeSendRequest(URI url, TextMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR send, url=" + url);
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR send, url=" + url);
}
ResponseEntity<String> response = executeSendRequestInternal(url, this.xhrSendRequestHeaders, message);
if (response.getStatusCode() != HttpStatus.NO_CONTENT) {
......@@ -124,8 +124,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
}
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isDebugEnabled()) {
logger.debug("XHR send request (url=" + url + ") response: " + response);
if (logger.isTraceEnabled()) {
logger.trace("XHR send request (url=" + url + ") response: " + response);
}
}
......@@ -139,7 +139,8 @@ public abstract class AbstractXhrTransport implements XhrTransport {
URI receiveUrl = request.getTransportUrl();
if (logger.isDebugEnabled()) {
logger.debug("Opening XHR session, receive url=" + receiveUrl);
logger.debug("Starting XHR " +
(isXhrStreamingDisabled() ? "Polling" : "Streaming") + "session url=" + receiveUrl);
}
HttpHeaders handshakeHeaders = new HttpHeaders();
......
......@@ -133,8 +133,8 @@ class DefaultTransportRequest implements TransportRequest {
public void connect(WebSocketHandler handler, SettableListenableFuture<WebSocketSession> future) {
if (logger.isDebugEnabled()) {
logger.debug("Starting " + this);
if (logger.isTraceEnabled()) {
logger.trace("Starting " + this);
}
ConnectCallback connectCallback = new ConnectCallback(handler, future);
scheduleConnectTimeoutTask(connectCallback);
......@@ -144,14 +144,14 @@ class DefaultTransportRequest implements TransportRequest {
private void scheduleConnectTimeoutTask(ConnectCallback connectHandler) {
if (this.timeoutScheduler != null) {
if (logger.isDebugEnabled()) {
logger.debug("Scheduling connect to time out after " + this.timeoutValue + " milliseconds");
if (logger.isTraceEnabled()) {
logger.trace("Scheduling connect to time out after " + this.timeoutValue + " ms.");
}
Date timeoutDate = new Date(System.currentTimeMillis() + this.timeoutValue);
this.timeoutScheduler.schedule(connectHandler, timeoutDate);
}
else if (logger.isDebugEnabled()) {
logger.debug("Connect timeout task not scheduled. Is SockJsClient configured with a TaskScheduler?");
else if (logger.isTraceEnabled()) {
logger.trace("Connect timeout task not scheduled (no TaskScheduler configured).");
}
}
......
......@@ -136,8 +136,8 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
}
private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR receive request, url=" + url);
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + url);
}
Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
addHttpHeaders(httpRequest, headers);
......@@ -182,9 +182,9 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
@Override
public void onHeaders(Response response) {
if (logger.isDebugEnabled()) {
if (logger.isTraceEnabled()) {
// Convert to HttpHeaders to avoid "\n"
logger.debug("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
}
}
......@@ -193,7 +193,7 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
while (true) {
if (this.sockJsSession.isDisconnected()) {
if (logger.isDebugEnabled()) {
logger.debug("SockJS sockJsSession closed. Closing ClientHttpResponse.");
logger.debug("SockJS sockJsSession closed, closing response.");
}
response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
return;
......@@ -228,8 +228,8 @@ public class JettyXhrTransport extends AbstractXhrTransport implements XhrTransp
if (this.outputStream.size() > 0) {
handleFrame();
}
if (logger.isDebugEnabled()) {
logger.debug("XHR receive request completed.");
if (logger.isTraceEnabled()) {
logger.trace("XHR receive request completed.");
}
executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
}
......
......@@ -125,8 +125,8 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
break;
}
try {
if (logger.isDebugEnabled()) {
logger.debug("Starting XHR receive request, url=" + receiveUrl);
if (logger.isTraceEnabled()) {
logger.trace("Starting XHR receive request, url=" + receiveUrl);
}
getRestTemplate().execute(receiveUrl, HttpMethod.POST, requestCallback, responseExtractor);
requestCallback = requestCallbackAfterHandshake;
......@@ -215,15 +215,15 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
if (!HttpStatus.OK.equals(response.getStatusCode())) {
throw new HttpServerErrorException(response.getStatusCode());
}
if (logger.isDebugEnabled()) {
logger.debug("XHR receive headers: " + response.getHeaders());
if (logger.isTraceEnabled()) {
logger.trace("XHR receive headers: " + response.getHeaders());
}
InputStream is = response.getBody();
ByteArrayOutputStream os = new ByteArrayOutputStream();
while (true) {
if (this.sockJsSession.isDisconnected()) {
if (logger.isDebugEnabled()) {
logger.debug("SockJS sockJsSession closed. Closing ClientHttpResponse.");
logger.debug("SockJS sockJsSession closed, closing response.");
}
response.close();
break;
......@@ -233,8 +233,8 @@ public class RestTemplateXhrTransport extends AbstractXhrTransport implements Xh
if (os.size() > 0) {
handleFrame(os);
}
if (logger.isDebugEnabled()) {
logger.debug("XHR receive completed");
if (logger.isTraceEnabled()) {
logger.trace("XHR receive completed");
}
break;
}
......
......@@ -70,7 +70,7 @@ public class WebSocketTransport implements Transport {
URI url = request.getTransportUrl();
WebSocketHttpHeaders headers = new WebSocketHttpHeaders(request.getHandshakeHeaders());
if (logger.isDebugEnabled()) {
logger.debug("Opening WebSocket connection, url=" + url);
logger.debug("Starting WebSocket session url=" + url);
}
this.webSocketClient.doHandshake(handler, headers, url).addCallback(
new ListenableFutureCallback<WebSocketSession>() {
......
......@@ -268,54 +268,64 @@ public abstract class AbstractSockJsService implements SockJsService {
String sockJsPath, WebSocketHandler wsHandler) throws SockJsException {
if (sockJsPath == null) {
if (logger.isErrorEnabled()) {
logger.error("Expected SockJS path. Failing request: " + request.getURI());
}
logger.error("Expected SockJS path. Failing request: " + request.getURI());
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
if (logger.isTraceEnabled()) {
logger.trace(request.getMethod() + " with SockJS path [" + sockJsPath + "]");
}
try {
request.getHeaders();
}
catch (InvalidMediaTypeException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Invalid media type ignored: " + ex.getMediaType());
}
// As per SockJS protocol content-type can be ignored (it's always json)
}
String requestInfo = logger.isDebugEnabled() ? request.getMethod() + " " + request.getURI() : "";
try {
if (sockJsPath.equals("") || sockJsPath.equals("/")) {
logger.debug(requestInfo);
response.getHeaders().setContentType(new MediaType("text", "plain", Charset.forName("UTF-8")));
response.getBody().write("Welcome to SockJS!\n".getBytes("UTF-8"));
}
else if (sockJsPath.equals("/info")) {
logger.debug(requestInfo);
this.infoHandler.handle(request, response);
}
else if (sockJsPath.matches("/iframe[0-9-.a-z_]*.html")) {
logger.debug(requestInfo);
this.iframeHandler.handle(request, response);
}
else if (sockJsPath.equals("/websocket")) {
if (isWebSocketEnabled()) {
logger.debug(requestInfo);
handleRawWebSocketRequest(request, response, wsHandler);
}
else if (logger.isDebugEnabled()) {
logger.debug("WebSocket disabled, ignoring " + requestInfo);
}
}
else {
String[] pathSegments = StringUtils.tokenizeToStringArray(sockJsPath.substring(1), "/");
if (pathSegments.length != 3) {
if (logger.isErrorEnabled()) {
logger.error("Expected \"/{server}/{session}/{transport}\" but got \"" + sockJsPath + "\"");
}
logger.error("Ignoring invalid transport request " + requestInfo);
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
String serverId = pathSegments[0];
String sessionId = pathSegments[1];
String transport = pathSegments[2];
<<<<<<< HEAD
if (!validateRequest(serverId, sessionId, transport)) {
=======
if (!isWebSocketEnabled() && transport.equals("websocket")) {
logger.debug("WebSocket transport is disabled, ignoring " + requestInfo);
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
else if (!validateRequest(serverId, sessionId, transport)) {
logger.error("Ignoring transport request " + requestInfo);
>>>>>>> STOMP and WebSocket messaging related logging updates
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
......@@ -330,21 +340,23 @@ public abstract class AbstractSockJsService implements SockJsService {
protected boolean validateRequest(String serverId, String sessionId, String transport) {
if (!StringUtils.hasText(serverId) || !StringUtils.hasText(sessionId) || !StringUtils.hasText(transport)) {
logger.error("Empty server, session, or transport value");
logger.error("No server, session, or transport path segment");
return false;
}
// Server and session id's must not contain "."
if (serverId.contains(".") || sessionId.contains(".")) {
<<<<<<< HEAD
logger.error("Server or session contain a \".\"");
return false;
}
if (!isWebSocketEnabled() && transport.equals("websocket")) {
logger.debug("Ignoring WebSocket request (transport disabled via SockJsService property)");
=======
logger.error("Either server or session contains a \".\" which is not allowed by SockJS protocol.");
>>>>>>> STOMP and WebSocket messaging related logging updates
return false;
}
return true;
}
......@@ -368,7 +380,6 @@ public abstract class AbstractSockJsService implements SockJsService {
try {
// Perhaps a CORS Filter has already added this?
if (!CollectionUtils.isEmpty(responseHeaders.get("Access-Control-Allow-Origin"))) {
logger.trace("Skip adding CORS headers, response already contains \"Access-Control-Allow-Origin\"");
return;
}
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -43,6 +43,8 @@ import org.springframework.web.socket.sockjs.SockJsService;
*/
public class SockJsHttpRequestHandler implements HttpRequestHandler {
// No logging: HTTP transports too verbose and we don't know enough to log anything of value
private final SockJsService sockJsService;
private final WebSocketHandler webSocketHandler;
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.ServerHttpRequest;
......@@ -192,16 +194,14 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem
TransportType transportType = TransportType.fromValue(transport);
if (transportType == null) {
if (logger.isErrorEnabled()) {
logger.error("Unknown transport type: " + transportType);
}
logger.error("Unknown transport type for " + request.getURI());
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
TransportHandler transportHandler = this.handlers.get(transportType);
if (transportHandler == null) {
logger.error("Transport handler not found");
logger.error("No TransportHandler for " + request.getURI());
response.setStatusCode(HttpStatus.NOT_FOUND);
return;
}
......@@ -239,7 +239,9 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem
else {
response.setStatusCode(HttpStatus.NOT_FOUND);
if (logger.isDebugEnabled()) {
logger.debug("Session not found, sessionId=" + sessionId);
logger.debug("Session not found, sessionId=" + sessionId +
". The session may have been closed " +
"(e.g. missed heart-beat) while a message was coming in.");
}
return;
}
......@@ -277,17 +279,11 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem
if (session != null) {
return session;
}
if (this.sessionCleanupTask == null) {
scheduleSessionTask();
}
if (logger.isDebugEnabled()) {
logger.debug("Creating new SockJS session, sessionId=" + sessionId);
}
session = sessionFactory.createSession(sessionId, handler, attributes);
this.sessions.put(sessionId, session);
return session;
}
......@@ -297,31 +293,24 @@ public class TransportHandlingSockJsService extends AbstractSockJsService implem
if (this.sessionCleanupTask != null) {
return;
}
final List<String> removedSessionIds = new ArrayList<String>();
this.sessionCleanupTask = getTaskScheduler().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
int count = sessions.size();
if (logger.isTraceEnabled() && (count != 0)) {
logger.trace("Checking " + count + " session(s) for timeouts [" + getName() + "]");
}
for (SockJsSession session : sessions.values()) {
for (SockJsSession session : sessions.values()) {
try {
if (session.getTimeSinceLastActive() > getDisconnectDelay()) {
if (logger.isTraceEnabled()) {
logger.trace("Removing " + session + " for [" + getName() + "]");
}
session.close();
sessions.remove(session.getId());
session.close();
}
}
if (logger.isTraceEnabled() && count > 0) {
logger.trace(sessions.size() + " remaining session(s) [" + getName() + "]");
catch (Throwable ex) {
logger.error("Failed to close " + session, ex);
}
}
catch (Throwable ex) {
if (logger.isErrorEnabled()) {
logger.error("Failed to complete session timeout checks for [" + getName() + "]", ex);
}
if (logger.isDebugEnabled() && !removedSessionIds.isEmpty()) {
logger.debug("Closed " + removedSessionIds.size() + " sessions " + removedSessionIds);
removedSessionIds.clear();
}
}
}, getDisconnectDelay());
......
......@@ -37,6 +37,7 @@ import org.springframework.web.socket.sockjs.transport.session.AbstractHttpSockJ
*/
public abstract class AbstractHttpReceivingTransportHandler extends AbstractTransportHandler {
@Override
public final void handleRequest(ServerHttpRequest request, ServerHttpResponse response,
WebSocketHandler wsHandler, SockJsSession wsSession) throws SockJsException {
......@@ -70,16 +71,13 @@ public abstract class AbstractHttpReceivingTransportHandler extends AbstractTran
handleReadError(response, "Failed to read message(s)", sockJsSession.getId());
return;
}
if (messages == null) {
handleReadError(response, "Payload expected.", sockJsSession.getId());
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Received message(s): " + Arrays.asList(messages));
}
response.setStatusCode(getResponseStatus());
response.getHeaders().setContentType(new MediaType("text", "plain", UTF8_CHARSET));
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -63,12 +63,14 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp
if (sockJsSession.isNew()) {
if (logger.isDebugEnabled()) {
logger.debug("Opening " + getTransportType() + " connection.");
logger.debug(request.getMethod() + " " + request.getURI());
}
sockJsSession.handleInitialRequest(request, response, getFrameFormat(request));
}
else if (sockJsSession.isClosed()) {
logger.debug("Connection already closed (but not removed yet).");
if (logger.isDebugEnabled()) {
logger.debug("Connection already closed (but not removed yet) for " + sockJsSession);
}
SockJsFrame frame = SockJsFrame.closeFrameGoAway();
try {
response.getBody().write(frame.getContentBytes());
......@@ -76,7 +78,6 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp
catch (IOException ex) {
throw new SockJsException("Failed to send " + frame, sockJsSession.getId(), ex);
}
return;
}
else if (!sockJsSession.isActive()) {
if (logger.isTraceEnabled()) {
......@@ -86,7 +87,7 @@ public abstract class AbstractHttpSendingTransportHandler extends AbstractTransp
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Another " + getTransportType() + " connection still open: " + sockJsSession);
logger.debug("Another " + getTransportType() + " connection still open for " + sockJsSession);
}
String formattedFrame = getFrameFormat(request).format(SockJsFrame.closeFrameAnotherConnectionOpen());
try {
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -32,6 +32,7 @@ public abstract class AbstractTransportHandler implements TransportHandler {
protected static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
protected final Log logger = LogFactory.getLog(this.getClass());
private SockJsServiceConfig serviceConfig;
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -90,7 +90,7 @@ public class DefaultSockJsService extends TransportHandlingSockJsService {
catch (Exception ex) {
Log logger = LogFactory.getLog(DefaultSockJsService.class);
if (logger.isWarnEnabled()) {
logger.warn("Failed to create default WebSocketTransportHandler", ex);
logger.warn("Failed to create a default WebSocketTransportHandler", ex);
}
}
if (overrides != null) {
......
......@@ -55,6 +55,7 @@ public abstract class AbstractSockJsSession implements SockJsSession {
protected final Log logger = LogFactory.getLog(getClass());
/**
* Log category to use on network IO exceptions after a client has gone away.
*
......@@ -65,7 +66,7 @@ public abstract class AbstractSockJsSession implements SockJsSession {
*
* <p>We make a best effort to identify such network failures, on a per-server
* basis, and log them under a separate log category. A simple one-line message
* is logged at INFO level, while a full stack trace is shown at TRACE level.
* is logged at DEBUG level, while a full stack trace is shown at TRACE level.
*
* @see #disconnectedClientLogger
*/
......@@ -268,8 +269,8 @@ public abstract class AbstractSockJsSession implements SockJsSession {
@Override
public final void close(CloseStatus status) throws IOException {
if (isOpen()) {
if (logger.isInfoEnabled()) {
logger.info("Closing SockJS session " + getId() + " with " + status);
if (logger.isDebugEnabled()) {
logger.debug("Closing SockJS session " + getId() + " with " + status);
}
this.state = State.CLOSED;
try {
......@@ -362,8 +363,8 @@ public abstract class AbstractSockJsSession implements SockJsSession {
if (disconnectedClientLogger.isTraceEnabled()) {
disconnectedClientLogger.trace("Looks like the client has gone away", failure);
}
else if (disconnectedClientLogger.isInfoEnabled()) {
disconnectedClientLogger.info("Looks like the client has gone away: " +
else if (disconnectedClientLogger.isDebugEnabled()) {
disconnectedClientLogger.debug("Looks like the client has gone away: " +
nestedException.getMessage() + " (For full stack trace, set the '" +
DISCONNECTED_CLIENT_LOG_CATEGORY + "' log category to TRACE level)");
}
......@@ -426,7 +427,7 @@ public abstract class AbstractSockJsSession implements SockJsSession {
@Override
public String toString() {
return getClass().getSimpleName() + "[id=" + getId() + ", uri=" + getUri() + "]";
return getClass().getSimpleName() + "[id=" + getId() + "]";
}
......
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -180,7 +180,6 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
public void handleMessage(TextMessage message, WebSocketSession wsSession) throws Exception {
String payload = message.getPayload();
if (StringUtils.isEmpty(payload)) {
logger.trace("Ignoring empty message");
return;
}
String[] messages;
......@@ -217,7 +216,7 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
@Override
protected void writeFrameInternal(SockJsFrame frame) throws IOException {
if (logger.isTraceEnabled()) {
logger.trace("Write " + frame);
logger.trace("Writing " + frame);
}
TextMessage message = new TextMessage(frame.getContent());
this.webSocketSession.sendMessage(message);
......@@ -233,13 +232,4 @@ public class WebSocketServerSockJsSession extends AbstractSockJsSession implemen
}
}
@Override
public String toString() {
if (getNativeSession() != null) {
return super.toString();
}
else {
return "WebSocketServerSockJsSession[id=" + getId() + ", uri=null]";
}
}
}
......@@ -58,6 +58,8 @@ public class HttpSendingTransportHandlerTests extends AbstractHttpRequestTests
this.sockJsConfig = new StubSockJsServiceConfig();
this.sockJsConfig.setTaskScheduler(this.taskScheduler);
setRequest("POST", "/");
}
@Test
......@@ -100,6 +102,7 @@ public class HttpSendingTransportHandlerTests extends AbstractHttpRequestTests
assertEquals("\"callback\" parameter required", this.servletResponse.getContentAsString());
resetRequestAndResponse();
setRequest("POST", "/");
this.servletRequest.setQueryString("c=callback");
this.servletRequest.addParameter("c", "callback");
transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session);
......@@ -136,6 +139,7 @@ public class HttpSendingTransportHandlerTests extends AbstractHttpRequestTests
assertEquals("\"callback\" parameter required", this.servletResponse.getContentAsString());
resetRequestAndResponse();
setRequest("POST", "/");
this.servletRequest.setQueryString("c=callback");
this.servletRequest.addParameter("c", "callback");
transportHandler.handleRequest(this.request, this.response, this.webSocketHandler, session);
......
......@@ -38700,7 +38700,7 @@ infrastructure components automatically gather stats and counters that provide
important insight into the internal state of the application. The configuration
also declares a bean of type `WebSocketMessageBrokerStats` that gathers all
available information in one place and by default logs it at INFO once
every 15 minutes. This bean can be exported to JMX through Spring's
every 30 minutes. This bean can be exported to JMX through Spring's
`MBeanExporter` for viewing at runtime for example through JDK's jconsole.
Below is a summary of the available information.
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册