提交 947f3d4b 编写于 作者: R Rossen Stoyanchev

Update Javadoc, add tests and polish spring-messaging

上级 3e0b0f10
...@@ -22,17 +22,18 @@ package org.springframework.messaging; ...@@ -22,17 +22,18 @@ package org.springframework.messaging;
* @author Mark Fisher * @author Mark Fisher
* @author Arjen Poutsma * @author Arjen Poutsma
* @since 4.0 * @since 4.0
*
* @see org.springframework.messaging.support.MessageBuilder * @see org.springframework.messaging.support.MessageBuilder
*/ */
public interface Message<T> { public interface Message<T> {
/** /**
* Returns message headers for the message (never {@code null}). * Return message headers for the message, never {@code null}.
*/ */
MessageHeaders getHeaders(); MessageHeaders getHeaders();
/** /**
* Returns the message payload. * Return the message payload.
*/ */
T getPayload(); T getPayload();
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Base channel interface defining common behavior for sending messages. * Defines methods for sending messages.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
...@@ -31,24 +31,27 @@ public interface MessageChannel { ...@@ -31,24 +31,27 @@ public interface MessageChannel {
/** /**
* Send a {@link Message} to this channel. May throw a RuntimeException for * Send a {@link Message} to this channel. If the message is sent successfully,
* non-recoverable errors. Otherwise, if the Message cannot be sent for a non-fatal * the method returns {@code true}. If the message cannot be sent due to a
* reason this method will return 'false', and if the Message is sent successfully, it * non-fatal reason, the method returns {@code false}. The method may also
* will return 'true'. * throw a RuntimeException in case of non-recoverable errors.
* <p>Depending on the implementation, this method may block indefinitely. To provide a * <p>
* maximum wait time, use {@link #send(Message, long)}. * This method may block indefinitely, depending on the implementation.
* @param message the {@link Message} to send * To provide a maximum wait time, use {@link #send(Message, long)}.
* @return whether or not the Message has been sent successfully *
* @param message the message to send
* @return whether or not the message was sent
*/ */
boolean send(Message<?> message); boolean send(Message<?> message);
/** /**
* Send a message, blocking until either the message is accepted or the specified * Send a message, blocking until either the message is accepted or the
* timeout period elapses. * specified timeout period elapses.
* @param message the {@link Message} to send *
* @param timeout the timeout in milliseconds or #INDEFINITE_TIMEOUT * @param message the message to send
* @return {@code true} if the message is sent successfully, {@code false} if the * @param timeout the timeout in milliseconds or {@link #INDEFINITE_TIMEOUT}
* specified timeout period elapses or the send is interrupted * @return {@code true} if the message is sent, {@code false} if not including
* a timeout of an interrupt of the send
*/ */
boolean send(Message<?> message, long timeout); boolean send(Message<?> message, long timeout);
......
...@@ -25,6 +25,7 @@ package org.springframework.messaging; ...@@ -25,6 +25,7 @@ package org.springframework.messaging;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class MessageDeliveryException extends MessagingException { public class MessageDeliveryException extends MessagingException {
public MessageDeliveryException(String description) { public MessageDeliveryException(String description) {
super(description); super(description);
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Base interface for any component that handles Messages. * Contract for handling a {@link Message}.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Iwein Fuld * @author Iwein Fuld
...@@ -26,17 +26,9 @@ package org.springframework.messaging; ...@@ -26,17 +26,9 @@ package org.springframework.messaging;
public interface MessageHandler { public interface MessageHandler {
/** /**
* Handles the message if possible. If the handler cannot deal with the * Handle the given message.
* message this will result in a {@code MessageRejectedException} e.g. *
* in case of a Selective Consumer. When a consumer tries to handle a
* message, but fails to do so, a {@code MessageHandlingException} is
* thrown. In the last case it is recommended to treat the message as tainted
* and go into an error scenario.
* <p>When the handling results in a failure of another message being sent
* (e.g. a "reply" message), that failure will trigger a
* {@code MessageDeliveryException}.
* @param message the message to be handled * @param message the message to be handled
* reply related to the handling of the message
*/ */
void handleMessage(Message<?> message) throws MessagingException; void handleMessage(Message<?> message) throws MessagingException;
......
...@@ -14,15 +14,15 @@ ...@@ -14,15 +14,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.springframework.messaging.handler.annotation.support; package org.springframework.messaging;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* Thrown when the handling of a message results in an unrecoverable exception. * Exception that indicates an error occurred during message handling.
* *
* @author Rossen Stoyanchev * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public class MessageHandlingException extends MessagingException { public class MessageHandlingException extends MessagingException {
...@@ -30,12 +30,12 @@ public class MessageHandlingException extends MessagingException { ...@@ -30,12 +30,12 @@ public class MessageHandlingException extends MessagingException {
private static final long serialVersionUID = 690969923668400297L; private static final long serialVersionUID = 690969923668400297L;
public MessageHandlingException(Message<?> message, String description, Throwable cause) {
super(message, description, cause);
}
public MessageHandlingException(Message<?> message, String description) { public MessageHandlingException(Message<?> message, String description) {
super(message, description); super(message, description);
} }
public MessageHandlingException(Message<?> message, String description, Throwable cause) {
super(message, description, cause);
}
} }
...@@ -38,29 +38,34 @@ import org.springframework.util.IdGenerator; ...@@ -38,29 +38,34 @@ import org.springframework.util.IdGenerator;
/** /**
* The headers for a {@link Message} * The headers for a {@link Message}
* * <p>
* <p><b>IMPORTANT</b>: This class is immutable. Any mutating operation * <b>IMPORTANT</b>: This class is immutable. Any mutating operation such as
* (e.g., put(..), putAll(..) etc.) will throw {@link UnsupportedOperationException}. * {@code put(..)}, {@code putAll(..)} and others will throw
* * {@link UnsupportedOperationException}.
* <p>To create MessageHeaders instance use fluent * <p>
* {@link org.springframework.messaging.support.MessageBuilder MessageBuilder} API * One way to create message headers is to use the
* {@link org.springframework.messaging.support.MessageBuilder MessageBuilder}:
* <pre class="code"> * <pre class="code">
* MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2"); * MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
* </pre> * </pre>
* or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map} * A second option is to create {@link org.springframework.messaging.support.GenericMessage}
* passing a payload as {@link Object} and headers as a {@link Map java.util.Map}:
* <pre class="code"> * <pre class="code">
* Map headers = new HashMap(); * Map headers = new HashMap();
* headers.put("key1", "value1"); * headers.put("key1", "value1");
* headers.put("key2", "value2"); * headers.put("key2", "value2");
* new GenericMessage("foo", headers); * new GenericMessage("foo", headers);
* </pre> * </pre>
* A third option is to use {@link org.springframework.messaging.support.MessageHeaderAccessor}
* or one of its sub-classes to create specific categories of headers.
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Mark Fisher * @author Mark Fisher
* @author Gary Russell * @author Gary Russell
* @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see org.springframework.messaging.support.MessageBuilder * @see org.springframework.messaging.support.MessageBuilder
* @see org.springframework.messaging.support.MessageHeaderAccessor
*/ */
public final class MessageHeaders implements Map<String, Object>, Serializable { public final class MessageHeaders implements Map<String, Object>, Serializable {
...@@ -124,8 +129,8 @@ public final class MessageHeaders implements Map<String, Object>, Serializable { ...@@ -124,8 +129,8 @@ public final class MessageHeaders implements Map<String, Object>, Serializable {
return null; return null;
} }
if (!type.isAssignableFrom(value.getClass())) { if (!type.isAssignableFrom(value.getClass())) {
throw new IllegalArgumentException("Incorrect type specified for header '" + key + "'. Expected [" + type throw new IllegalArgumentException("Incorrect type specified for header '" +
+ "] but actual type is [" + value.getClass() + "]"); key + "'. Expected [" + type + "] but actual type is [" + value.getClass() + "]");
} }
return (T) value; return (T) value;
} }
......
...@@ -17,8 +17,7 @@ ...@@ -17,8 +17,7 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Interface for Message Channels from which Messages may be actively received through * A {@link MessageChannel} from which messages may be actively received through polling.
* polling.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
...@@ -27,6 +26,7 @@ public interface PollableChannel extends MessageChannel { ...@@ -27,6 +26,7 @@ public interface PollableChannel extends MessageChannel {
/** /**
* Receive a message from this channel, blocking indefinitely if necessary. * Receive a message from this channel, blocking indefinitely if necessary.
*
* @return the next available {@link Message} or {@code null} if interrupted * @return the next available {@link Message} or {@code null} if interrupted
*/ */
Message<?> receive(); Message<?> receive();
...@@ -34,10 +34,10 @@ public interface PollableChannel extends MessageChannel { ...@@ -34,10 +34,10 @@ public interface PollableChannel extends MessageChannel {
/** /**
* Receive a message from this channel, blocking until either a message is available * Receive a message from this channel, blocking until either a message is available
* or the specified timeout period elapses. * or the specified timeout period elapses.
* @param timeout the timeout in milliseconds or *
* {@link MessageChannel#INDEFINITE_TIMEOUT}. * @param timeout the timeout in milliseconds or {@link MessageChannel#INDEFINITE_TIMEOUT}.
* @return the next available {@link Message} or {@code null} if the specified timeout * @return the next available {@link Message} or {@code null} if the specified timeout
* period elapses or the message reception is interrupted * period elapses or the message reception is interrupted
*/ */
Message<?> receive(long timeout); Message<?> receive(long timeout);
......
...@@ -17,26 +17,28 @@ ...@@ -17,26 +17,28 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Interface for any MessageChannel implementation that accepts subscribers. * A {@link MessageChannel} that maintains a registry of subscribers and invokes
* The subscribers must implement the {@link MessageHandler} interface and * them to handle messages sent through this channel.
* will be invoked when a Message is available.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public interface SubscribableChannel extends MessageChannel { public interface SubscribableChannel extends MessageChannel {
/** /**
* Register a {@link MessageHandler} as a subscriber to this channel. * Register a message handler.
* @return {@code true} if the channel was not already subscribed to the specified *
* handler * @return {@code true} if the handler was subscribed or {@code false} if it
* was already subscribed.
*/ */
boolean subscribe(MessageHandler handler); boolean subscribe(MessageHandler handler);
/** /**
* Remove a {@link MessageHandler} from the subscribers of this channel. * Un-register a message handler.
* @return {@code true} if the channel was previously subscribed to the specified *
* handler * @return {@code true} if the handler was un-registered, or {@code false}
* if was not registered.
*/ */
boolean unsubscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler);
......
...@@ -21,14 +21,20 @@ import org.springframework.messaging.Message; ...@@ -21,14 +21,20 @@ import org.springframework.messaging.Message;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* Base class for a messaging template that can resolve String-based destinations. * An extension of {@link AbstractMessagingTemplate} that adds operations for sending
* messages to a resolvable destination name as defined by the following interfaces:
* <ul>
* <li>{@link DestinationResolvingMessageSendingOperations}</li>
* <li>{@link DestinationResolvingMessageReceivingOperations}</li>
* <li>{@link DestinationResolvingMessageRequestReplyOperations}</li>
* </ul>
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractMessagingTemplate<D>
AbstractMessagingTemplate<D> implements implements
DestinationResolvingMessageSendingOperations<D>, DestinationResolvingMessageSendingOperations<D>,
DestinationResolvingMessageReceivingOperations<D>, DestinationResolvingMessageReceivingOperations<D>,
DestinationResolvingMessageRequestReplyOperations<D> { DestinationResolvingMessageRequestReplyOperations<D> {
...@@ -36,19 +42,36 @@ public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends ...@@ -36,19 +42,36 @@ public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends
private volatile DestinationResolver<D> destinationResolver; private volatile DestinationResolver<D> destinationResolver;
/**
* Configure the {@link DestinationResolver} to use to resolve String destination
* names into actual destinations of type {@code <D>}.
* <p>
* This field does not have a default setting. If not configured, methods that
* require resolving a destination name will raise an {@link IllegalArgumentException}.
*
* @param destinationResolver the destination resolver to use
*/
public void setDestinationResolver(DestinationResolver<D> destinationResolver) { public void setDestinationResolver(DestinationResolver<D> destinationResolver) {
Assert.notNull(destinationResolver, "'destinationResolver' is required");
this.destinationResolver = destinationResolver; this.destinationResolver = destinationResolver;
} }
/**
* Return the configured destination resolver.
*/
public DestinationResolver<D> getDestinationResolver() {
return this.destinationResolver;
}
@Override @Override
public <P> void send(String destinationName, Message<P> message) { public void send(String destinationName, Message<?> message) {
D destination = resolveDestination(destinationName); D destination = resolveDestination(destinationName);
this.doSend(destination, message); this.doSend(destination, message);
} }
protected final D resolveDestination(String destinationName) { protected final D resolveDestination(String destinationName) {
Assert.notNull(destinationResolver, "destinationResolver is required when passing a name only"); Assert.state(this.destinationResolver != null, "destinationResolver is required to resolve destination names");
return this.destinationResolver.resolveDestination(destinationName); return this.destinationResolver.resolveDestination(destinationName);
} }
...@@ -79,7 +102,7 @@ public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends ...@@ -79,7 +102,7 @@ public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends
} }
@Override @Override
public <P> Message<P> receive(String destinationName) { public Message<?> receive(String destinationName) {
D destination = resolveDestination(destinationName); D destination = resolveDestination(destinationName);
return super.receive(destination); return super.receive(destination);
} }
......
...@@ -27,7 +27,7 @@ import org.springframework.messaging.support.converter.SimpleMessageConverter; ...@@ -27,7 +27,7 @@ import org.springframework.messaging.support.converter.SimpleMessageConverter;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* Base class for templates that support sending messages. * Abstract base class for implementations of {@link MessageSendingOperations}.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
...@@ -42,18 +42,30 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin ...@@ -42,18 +42,30 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
private volatile MessageConverter converter = new SimpleMessageConverter(); private volatile MessageConverter converter = new SimpleMessageConverter();
/**
* Configure the default destination to use in send methods that don't have
* a destination argument. If a default destination is not configured, send methods
* without a destination argument will raise an exception if invoked.
*
* @param defaultDestination the default destination
*/
public void setDefaultDestination(D defaultDestination) { public void setDefaultDestination(D defaultDestination) {
this.defaultDestination = defaultDestination; this.defaultDestination = defaultDestination;
} }
/**
* Return the configured default destination.
*/
public D getDefaultDestination() { public D getDefaultDestination() {
return this.defaultDestination; return this.defaultDestination;
} }
/** /**
* Set the {@link MessageConverter} that is to be used to convert * Set the {@link MessageConverter} to use in {@code convertAndSend} methods.
* between Messages and objects for this template. * <p>
* <p>The default is {@link SimpleMessageConverter}. * By default {@link SimpleMessageConverter} is used.
*
* @param messageConverter the message converter to use
*/ */
public void setMessageConverter(MessageConverter messageConverter) { public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null"); Assert.notNull(messageConverter, "'messageConverter' must not be null");
...@@ -61,7 +73,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin ...@@ -61,7 +73,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
} }
/** /**
* @return the configured {@link MessageConverter} * Return the configured {@link MessageConverter}.
*/ */
public MessageConverter getMessageConverter() { public MessageConverter getMessageConverter() {
return this.converter; return this.converter;
...@@ -74,9 +86,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin ...@@ -74,9 +86,7 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
} }
protected final D getRequiredDefaultDestination() { protected final D getRequiredDefaultDestination() {
Assert.state(this.defaultDestination != null, Assert.state(this.defaultDestination != null, "No 'defaultDestination' configured.");
"No 'defaultDestination' specified for MessagingTemplate. "
+ "Unable to invoke method without an explicit destination argument.");
return this.defaultDestination; return this.defaultDestination;
} }
...@@ -89,8 +99,8 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin ...@@ -89,8 +99,8 @@ public abstract class AbstractMessageSendingTemplate<D> implements MessageSendin
@Override @Override
public void convertAndSend(Object message) throws MessagingException { public void convertAndSend(Object payload) throws MessagingException {
this.convertAndSend(getRequiredDefaultDestination(), message); this.convertAndSend(getRequiredDefaultDestination(), payload);
} }
@Override @Override
......
...@@ -21,7 +21,9 @@ import org.springframework.messaging.Message; ...@@ -21,7 +21,9 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.MessageHeaders;
/** /**
* Base class for a messaging template that send and receive messages. * An extension of {@link AbstractMessageSendingTemplate} that adds support for
* receive as well as request-reply style operations as defined by
* {@link MessageReceivingOperations} and {@link MessageRequestReplyOperations}.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
...@@ -32,16 +34,16 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin ...@@ -32,16 +34,16 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
@Override @Override
public <P> Message<P> receive() { public Message<?> receive() {
return this.receive(getRequiredDefaultDestination()); return this.receive(getRequiredDefaultDestination());
} }
@Override @Override
public <P> Message<P> receive(D destination) { public Message<?> receive(D destination) {
return this.doReceive(destination); return this.doReceive(destination);
} }
protected abstract <P> Message<P> doReceive(D destination); protected abstract Message<?> doReceive(D destination);
@Override @Override
...@@ -71,7 +73,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin ...@@ -71,7 +73,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
return this.doSendAndReceive(destination, requestMessage); return this.doSendAndReceive(destination, requestMessage);
} }
protected abstract <S, R> Message<R> doSendAndReceive(D destination, Message<S> requestMessage); protected abstract Message<?> doSendAndReceive(D destination, Message<?> requestMessage);
@Override @Override
...@@ -117,7 +119,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin ...@@ -117,7 +119,7 @@ public abstract class AbstractMessagingTemplate<D> extends AbstractMessageSendin
requestMessage = postProcessor.postProcessMessage(requestMessage); requestMessage = postProcessor.postProcessMessage(requestMessage);
} }
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage); Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return (T) getMessageConverter().fromMessage(replyMessage, targetClass); return (replyMessage != null) ? (T) getMessageConverter().fromMessage(replyMessage, targetClass) : null;
} }
} }
...@@ -22,37 +22,38 @@ import org.springframework.messaging.MessageChannel; ...@@ -22,37 +22,38 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* {@link DestinationResolver} that resolves against named beans contained in a * An implementation of {@link DestinationResolver} that interprets a destination
* {@link BeanFactory}. * name as the bean name of a {@link MessageChannel} and looks up the bean in
* the configured {@link BeanFactory}.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public class BeanFactoryMessageChannelDestinationResolver implements DestinationResolver<MessageChannel>, BeanFactoryAware { public class BeanFactoryMessageChannelDestinationResolver
implements DestinationResolver<MessageChannel>, BeanFactoryAware {
private volatile BeanFactory beanFactory; private volatile BeanFactory beanFactory;
/** /**
* Create a new instance of the {@link * A default constructor that can be used when the resolver itself is configured
* BeanFactoryMessageChannelDestinationResolver} class. * as a Spring bean and will have the {@code BeanFactory} injected as a result
* <p>The BeanFactory to access must be set via <code>setBeanFactory</code>. * of ing having implemented {@link BeanFactoryAware}.
* This will happen automatically if this resolver is defined within an
* ApplicationContext thereby receiving the callback upon initialization.
* @see #setBeanFactory
*/ */
public BeanFactoryMessageChannelDestinationResolver() { public BeanFactoryMessageChannelDestinationResolver() {
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
} }
/** /**
* Create a new instance of the {@link * A constructor that accepts a {@link BeanFactory} useful if instantiating this
* BeanFactoryMessageChannelDestinationResolver} class. * resolver manually rather than having it defined as a Spring-managed bean.
* <p>Use of this constructor is redundant if this object is being created *
* by a Spring IoC container as the supplied {@link BeanFactory} will be * @param beanFactory the bean factory to perform lookups against
* replaced by the {@link BeanFactory} that creates it (c.f. the
* {@link BeanFactoryAware} contract). So only use this constructor if you
* are instantiating this object explicitly rather than defining a bean.
* @param beanFactory the bean factory to be used to lookup {@link MessageChannel}s.
*/ */
public BeanFactoryMessageChannelDestinationResolver(BeanFactory beanFactory) { public BeanFactoryMessageChannelDestinationResolver(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null"); Assert.notNull(beanFactory, "beanFactory must not be null");
...@@ -62,19 +63,14 @@ public class BeanFactoryMessageChannelDestinationResolver implements Destination ...@@ -62,19 +63,14 @@ public class BeanFactoryMessageChannelDestinationResolver implements Destination
@Override @Override
public MessageChannel resolveDestination(String name) { public MessageChannel resolveDestination(String name) {
Assert.state(this.beanFactory != null, "BeanFactory must not be null"); Assert.state(this.beanFactory != null, "No BeanFactory configured");
try { try {
return this.beanFactory.getBean(name, MessageChannel.class); return this.beanFactory.getBean(name, MessageChannel.class);
} }
catch (BeansException e) { catch (BeansException e) {
throw new DestinationResolutionException( throw new DestinationResolutionException(
"failed to look up MessageChannel bean with name '" + name + "'", e); "Failed to find MessageChannel bean with name '" + name + "'", e);
} }
} }
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = beanFactory;
}
} }
...@@ -28,8 +28,10 @@ import org.springframework.messaging.MessagingException; ...@@ -28,8 +28,10 @@ import org.springframework.messaging.MessagingException;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class DestinationResolutionException extends MessagingException { public class DestinationResolutionException extends MessagingException {
/** /**
* Create a new ChannelResolutionException. * Create an instance with the given description only.
*
* @param description the description * @param description the description
*/ */
public DestinationResolutionException(String description) { public DestinationResolutionException(String description) {
...@@ -37,9 +39,10 @@ public class DestinationResolutionException extends MessagingException { ...@@ -37,9 +39,10 @@ public class DestinationResolutionException extends MessagingException {
} }
/** /**
* Create a new ChannelResolutionException. * Create an instance with the given description and original cause.
*
* @param description the description * @param description the description
* @param cause the root cause (if any) * @param cause the root cause
*/ */
public DestinationResolutionException(String description, Throwable cause) { public DestinationResolutionException(String description, Throwable cause) {
super(description, cause); super(description, cause);
......
...@@ -17,18 +17,20 @@ ...@@ -17,18 +17,20 @@
package org.springframework.messaging.core; package org.springframework.messaging.core;
/** /**
* Strategy interface that resolves a given name to a destination. * Strategy for resolving a String destination name into an actual destination
* of type {@code <D>}.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*/ */
public interface DestinationResolver<D> { public interface DestinationResolver<D> {
/** /**
* Resolve the given {@code name} to a destination. * Resolve the given destination name.
* @param name the name to resolve *
* @return the destination (must not be {@code null}) * @param name the destination name to resolve
* @throws DestinationResolutionException if the name cannot be resolved * @return the destination, never {@code null}
*/ */
D resolveDestination(String name) throws DestinationResolutionException; D resolveDestination(String name) throws DestinationResolutionException;
......
...@@ -19,16 +19,31 @@ import org.springframework.messaging.Message; ...@@ -19,16 +19,31 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A {@link MessageReceivingOperations} that can resolve a String-based destinations. * Extends {@link MessageReceivingOperations} and adds operations for receiving messages
* from a destination specified as a (resolvable) String name.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see DestinationResolver
*/ */
public interface DestinationResolvingMessageReceivingOperations<D> extends MessageReceivingOperations<D> { public interface DestinationResolvingMessageReceivingOperations<D> extends MessageReceivingOperations<D> {
<P> Message<P> receive(String destinationName) throws MessagingException; /**
* Resolve the given destination name and receive a message from it.
*
* @param destinationName the destination name to resolve
*/
Message<?> receive(String destinationName) throws MessagingException;
/**
* Resolve the given destination name, receive a message from it, convert the
* payload to the specified target type.
*
* @param destinationName the destination name to resolve
* @param targetClass the target class for the converted payload
*/
<T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException; <T> T receiveAndConvert(String destinationName, Class<T> targetClass) throws MessagingException;
} }
...@@ -21,25 +21,95 @@ import org.springframework.messaging.Message; ...@@ -21,25 +21,95 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A {@link MessageRequestReplyOperations} that can resolve a String-based destinations. * Extends {@link MessageRequestReplyOperations} and adds operations for sending and
* receiving messages to and from a destination specified as a (resolvable) String name.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see DestinationResolver
*/ */
public interface DestinationResolvingMessageRequestReplyOperations<D> extends MessageRequestReplyOperations<D> { public interface DestinationResolvingMessageRequestReplyOperations<D> extends MessageRequestReplyOperations<D> {
/**
* Resolve the given destination name to a destination and send the given message,
* receive a reply and return it.
*
* @param destinationName the name of the target destination
* @param requestMessage the mesage to send
* @return the received message, possibly {@code null} if the message could not
* be received, for example due to a timeout
*/
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException; Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) throws MessagingException;
/**
* Resolve the given destination name, convert the payload request Object
* to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message and send it to the resolved destination, receive a reply
* and convert its body to the specified target class.
*
* @param destinationName the name of the target destination
* @param request the payload for the request message to send
* @param targetClass the target class to convert the payload of the reply to
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass) <T> T convertSendAndReceive(String destinationName, Object request, Class<T> targetClass)
throws MessagingException; throws MessagingException;
/**
* Resolve the given destination name, convert the payload request Object
* to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers and send it to the resolved destination,
* receive a reply and convert its body to the specified target class.
*
* @param destinationName the name of the target destination
* @param request the payload for the request message to send
* @param headers the headers for the request message to send
* @param targetClass the target class to convert the payload of the reply to
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers, <T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
Class<T> targetClass) throws MessagingException; Class<T> targetClass) throws MessagingException;
/**
* Resolve the given destination name, convert the payload request Object
* to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message, apply the given post process, and send the resulting
* message to the resolved destination, then receive a reply and convert its
* body to the specified target class.
*
* @param destinationName the name of the target destination
* @param request the payload for the request message to send
* @param targetClass the target class to convert the payload of the reply to
* @param requestPostProcessor post process for the request message
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, <T> T convertSendAndReceive(String destinationName, Object request,
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
/**
* Resolve the given destination name, convert the payload request Object
* to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers, apply the given post process,
* and send the resulting message to the resolved destination, then receive
* a reply and convert its body to the specified target class.
*
* @param destinationName the name of the target destination
* @param request the payload for the request message to send
* @param headers the headers for the request message to send
* @param targetClass the target class to convert the payload of the reply to
* @param requestPostProcessor post process for the request message
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers, <T> T convertSendAndReceive(String destinationName, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
......
...@@ -21,23 +21,76 @@ import org.springframework.messaging.Message; ...@@ -21,23 +21,76 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A {@link MessageSendingOperations} that can resolve a String-based destinations. * Extends {@link MessageSendingOperations} and adds operations for sending messages
* to a destination specified as a (resolvable) String name.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see DestinationResolver
*/ */
public interface DestinationResolvingMessageSendingOperations<D> extends MessageSendingOperations<D> { public interface DestinationResolvingMessageSendingOperations<D> extends MessageSendingOperations<D> {
<P> void send(String destinationName, Message<P> message) throws MessagingException; /**
* Resolve the given destination name to a destination and send a message to it.
*
* @param destinationName the destination name to resolve
* @param message the message to send
*/
void send(String destinationName, Message<?> message) throws MessagingException;
/**
* Resolve the given destination name to a destination, convert the payload Object
* to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message and send it to the resolved destination.
* @param destinationName the destination name to resolve
* @param payload the Object to use as payload
*/
<T> void convertAndSend(String destinationName, T payload) throws MessagingException; <T> void convertAndSend(String destinationName, T payload) throws MessagingException;
<T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers) throws MessagingException; /**
* Resolve the given destination name to a destination, convert the payload
* Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers and send it to the resolved
* destination.
<T> void convertAndSend(String destinationName, T payload, * @param destinationName the destination name to resolve
MessagePostProcessor postProcessor) throws MessagingException; * @param payload the Object to use as payload
* @param headers headers for the message to send
*/
<T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers)
throws MessagingException;
/**
* Resolve the given destination name to a destination, convert the payload
* Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message, apply the given post processor, and send the resulting
* message to the resolved destination.
* @param destinationName the destination name to resolve
* @param payload the Object to use as payload
* @param postProcessor the post processor to apply to the message
*/
<T> void convertAndSend(String destinationName, T payload, MessagePostProcessor postProcessor)
throws MessagingException;
/**
* Resolve the given destination name to a destination, convert the payload
* Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers, apply the given post processor,
* and send the resulting message to the resolved destination.
* @param destinationName the destination name to resolve
* @param payload the Object to use as payload
* @param headers headers for the message to send
* @param postProcessor the post processor to apply to the message
*/
<T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers, <T> void convertAndSend(String destinationName, T payload, Map<String, Object> headers,
MessagePostProcessor postProcessor) throws MessagingException; MessagePostProcessor postProcessor) throws MessagingException;
......
...@@ -32,10 +32,11 @@ import org.springframework.messaging.support.MessageBuilder; ...@@ -32,10 +32,11 @@ import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* A messaging template for sending to and/or receiving messages from a * A messaging template that resolves destinations names to {@link MessageChannel}'s
* {@link MessageChannel}. * to send and receive messages from.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*/ */
public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> public class GenericMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
...@@ -49,7 +50,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -49,7 +50,7 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
/** /**
* Specify the timeout value to use for send operations. * Configure the timeout value to use for send operations.
* *
* @param sendTimeout the send timeout in milliseconds * @param sendTimeout the send timeout in milliseconds
*/ */
...@@ -58,7 +59,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -58,7 +59,14 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
} }
/** /**
* Specify the timeout value to use for receive operations. * Return the configured send operation timeout value.
*/
public long getSendTimeout() {
return this.sendTimeout;
}
/**
* Configure the timeout value to use for receive operations.
* *
* @param receiveTimeout the receive timeout in milliseconds * @param receiveTimeout the receive timeout in milliseconds
*/ */
...@@ -67,12 +75,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -67,12 +75,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
} }
/** /**
* Specify whether or not an attempt to send on the reply channel throws an exception * Return the configured receive operation timeout value.
* if no receiving thread will actually receive the reply. This can occur */
* if the receiving thread has already timed out, or will never call receive() public long getReceiveTimeout() {
* because it caught an exception, or has already received a reply. return this.receiveTimeout;
* (default false - just a WARN log is emitted in these cases). }
* @param throwExceptionOnLateReply TRUE or FALSE.
/**
* Whether the thread sending a reply should have an exception raised if the
* receiving thread isn't going to receive the reply either because it timed out,
* or because it already received a reply, or because it got an exception while
* sending the request message.
* <p>
* The default value is {@code false} in which case only a WARN message is logged.
* If set to {@code true} a {@link MessageDeliveryException} is raised in addition
* to the log message.
*
* @param throwExceptionOnLateReply whether to throw an exception or not
*/ */
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) { public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
this.throwExceptionOnLateReply = throwExceptionOnLateReply; this.throwExceptionOnLateReply = throwExceptionOnLateReply;
...@@ -85,88 +104,91 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -85,88 +104,91 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
protected final void doSend(MessageChannel destination, Message<?> message) { protected final void doSend(MessageChannel channel, Message<?> message) {
Assert.notNull(destination, "channel must not be null");
Assert.notNull(channel, "channel must not be null");
long timeout = this.sendTimeout; long timeout = this.sendTimeout;
boolean sent = (timeout >= 0) boolean sent = (timeout >= 0) ? channel.send(message, timeout) : channel.send(message);
? destination.send(message, timeout)
: destination.send(message);
if (!sent) { if (!sent) {
throw new MessageDeliveryException(message, throw new MessageDeliveryException(message,
"failed to send message to channel '" + destination + "' within timeout: " + timeout); "failed to send message to channel '" + channel + "' within timeout: " + timeout);
} }
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
protected final <P> Message<P> doReceive(MessageChannel destination) { protected final Message<?> doReceive(MessageChannel channel) {
Assert.state(destination instanceof PollableChannel,
"The 'destination' must be a PollableChannel for receive operations."); Assert.notNull(channel, "'channel' is required");
Assert.state(channel instanceof PollableChannel, "A PollableChannel is required to receive messages.");
Assert.notNull(destination, "channel must not be null");
long timeout = this.receiveTimeout; long timeout = this.receiveTimeout;
Message<?> message = (timeout >= 0) Message<?> message = (timeout >= 0) ?
? ((PollableChannel) destination).receive(timeout) ((PollableChannel) channel).receive(timeout) : ((PollableChannel) channel).receive();
: ((PollableChannel) destination).receive();
if (message == null && this.logger.isTraceEnabled()) { if ((message == null) && this.logger.isTraceEnabled()) {
this.logger.trace("failed to receive message from channel '" + destination + "' within timeout: " + timeout); this.logger.trace("Failed to receive message from channel '" + channel + "' within timeout: " + timeout);
} }
return (Message<P>) message;
return message;
} }
@Override @Override
protected final <S, R> Message<R> doSendAndReceive(MessageChannel destination, Message<S> requestMessage) { protected final Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) {
Assert.notNull(channel, "'channel' is required");
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel(); Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel(); Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel replyChannel = new TemporaryReplyChannel(this.receiveTimeout, this.throwExceptionOnLateReply);
TemporaryReplyChannel tempReplyChannel = new TemporaryReplyChannel();
requestMessage = MessageBuilder.fromMessage(requestMessage) requestMessage = MessageBuilder.fromMessage(requestMessage)
.setReplyChannel(replyChannel) .setReplyChannel(tempReplyChannel)
.setErrorChannel(replyChannel) .setErrorChannel(tempReplyChannel).build();
.build();
try { try {
this.doSend(destination, requestMessage); this.doSend(channel, requestMessage);
} }
catch (RuntimeException e) { catch (RuntimeException e) {
replyChannel.setClientWontReceive(true); tempReplyChannel.setSendFailed(true);
throw e; throw e;
} }
Message<R> reply = this.doReceive(replyChannel);
if (reply != null) { Message<?> replyMessage = this.doReceive(tempReplyChannel);
reply = MessageBuilder.fromMessage(reply) if (replyMessage != null) {
replyMessage = MessageBuilder.fromMessage(replyMessage)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader) .setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader) .setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build(); .build();
} }
return reply;
}
private static class TemporaryReplyChannel implements PollableChannel {
private static final Log logger = LogFactory.getLog(TemporaryReplyChannel.class); return replyMessage;
}
private volatile Message<?> message;
private final long receiveTimeout; /**
* A temporary channel for receiving a single reply message.
*/
private class TemporaryReplyChannel implements PollableChannel {
private final CountDownLatch latch = new CountDownLatch(1); private final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
private final boolean throwExceptionOnLateReply; private volatile Message<?> replyMessage;
private volatile boolean clientTimedOut; private final CountDownLatch replyLatch = new CountDownLatch(1);
private volatile boolean clientWontReceive; private volatile boolean hasReceived;
private volatile boolean clientHasReceived; private volatile boolean hasTimedOut;
private volatile boolean hasSendFailed;
public TemporaryReplyChannel(long receiveTimeout, boolean throwExceptionOnLateReply) {
this.receiveTimeout = receiveTimeout;
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
public void setClientWontReceive(boolean clientWontReceive) { public void setSendFailed(boolean hasSendError) {
this.clientWontReceive = clientWontReceive; this.hasSendFailed = hasSendError;
} }
...@@ -178,23 +200,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -178,23 +200,23 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
public Message<?> receive(long timeout) { public Message<?> receive(long timeout) {
try { try {
if (this.receiveTimeout < 0) { if (GenericMessagingTemplate.this.receiveTimeout < 0) {
this.latch.await(); this.replyLatch.await();
this.clientHasReceived = true; this.hasReceived = true;
} }
else { else {
if (this.latch.await(this.receiveTimeout, TimeUnit.MILLISECONDS)) { if (this.replyLatch.await(GenericMessagingTemplate.this.receiveTimeout, TimeUnit.MILLISECONDS)) {
this.clientHasReceived = true; this.hasReceived = true;
} }
else { else {
this.clientTimedOut = true; this.hasTimedOut = true;
} }
} }
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
return this.message; return this.replyMessage;
} }
@Override @Override
...@@ -204,27 +226,31 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag ...@@ -204,27 +226,31 @@ public class GenericMessagingTemplate extends AbstractDestinationResolvingMessag
@Override @Override
public boolean send(Message<?> message, long timeout) { public boolean send(Message<?> message, long timeout) {
this.message = message;
this.latch.countDown();
if (this.clientTimedOut || this.clientHasReceived || this.clientWontReceive) {
String exceptionMessage = "";
if (this.clientTimedOut) {
exceptionMessage = "Reply message being sent, but the receiving thread has already timed out";
}
else if (this.clientHasReceived) {
exceptionMessage = "Reply message being sent, but the receiving thread has already received a reply";
}
else if (this.clientWontReceive) {
exceptionMessage = "Reply message being sent, but the receiving thread has already caught an exception and won't receive";
}
this.replyMessage = message;
this.replyLatch.countDown();
String errorDescription = null;
if (this.hasTimedOut) {
errorDescription = "Reply message received but the receiving thread has exited due to a timeout";
}
else if (this.hasReceived) {
errorDescription = "Reply message received but the receiving thread has already received a reply";
}
else if (this.hasSendFailed) {
errorDescription = "Reply message received but the receiving thread has exited due to " +
"an exception while sending the request message";
}
if (errorDescription != null) {
if (logger.isWarnEnabled()) { if (logger.isWarnEnabled()) {
logger.warn(exceptionMessage + ":" + message); logger.warn(errorDescription + ":" + message);
} }
if (this.throwExceptionOnLateReply) { if (GenericMessagingTemplate.this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, exceptionMessage); throw new MessageDeliveryException(message, errorDescription);
} }
} }
return true; return true;
} }
} }
......
...@@ -19,22 +19,23 @@ package org.springframework.messaging.core; ...@@ -19,22 +19,23 @@ package org.springframework.messaging.core;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
/** /**
* To be used with MessagingTemplate's send method that converts an object to a message. * A contract for processing a {@link Message} after it has been created, either
* It allows for further modification of the message after it has been processed * returning a modified (effectively new) message or returning the same.
* by the converter.
*
* <p>This is often implemented as an anonymous class within a method implementation.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see MessageSendingOperations
* @see MessageRequestReplyOperations
*/ */
public interface MessagePostProcessor { public interface MessagePostProcessor {
/** /**
* Apply a MessagePostProcessor to the message. The returned message is * Process the given message.
* typically a modified version of the original. *
* @param message the message returned from the MessageConverter * @param message the message to process
* @return the modified version of the Message * @return a new or the same message, never {@code null}
*/ */
Message<?> postProcessMessage(Message<?> message); Message<?> postProcessMessage(Message<?> message);
......
...@@ -19,22 +19,54 @@ import org.springframework.messaging.Message; ...@@ -19,22 +19,54 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A set of operations receiving messages from a destination. * Operations for receiving messages from a destination.
* *
* @param <D> the type of destination from which messages can be received * @param <D> the type of destination to receive messages from
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see GenericMessagingTemplate
*/ */
public interface MessageReceivingOperations<D> { public interface MessageReceivingOperations<D> {
<P> Message<P> receive() throws MessagingException; /**
* Receive a message from a default destination.
*
* @return the received message, possibly {@code null} if the message could not
* be received, for example due to a timeout
*/
Message<?> receive() throws MessagingException;
<P> Message<P> receive(D destination) throws MessagingException; /**
* Receive a message from the given destination.
*
* @param destination the target destination
* @return the received message, possibly {@code null} if the message could not
* be received, for example due to a timeout
*/
Message<?> receive(D destination) throws MessagingException;
/**
* Receive a message from a default destination and convert its payload to the
* specified target class.
*
* @param targetClass the target class to convert the payload to
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T receiveAndConvert(Class<T> targetClass) throws MessagingException; <T> T receiveAndConvert(Class<T> targetClass) throws MessagingException;
/**
* Receive a message from the given destination and convert its payload to the
* specified target class.
*
* @param destination the target destination
* @param targetClass the target class to convert the payload to
* @return the converted payload of the reply message, possibly {@code null} if
* the message could not be received, for example due to a timeout
*/
<T> T receiveAndConvert(D destination, Class<T> targetClass) throws MessagingException; <T> T receiveAndConvert(D destination, Class<T> targetClass) throws MessagingException;
} }
......
...@@ -21,33 +21,127 @@ import org.springframework.messaging.Message; ...@@ -21,33 +21,127 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A set of operations for exchanging messages to and from a destination. * Operations for sending messages to and receiving the reply from a destination.
* *
* @param <D> the type of destination to send and receive messages from * @param <D> the type of destination
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see GenericMessagingTemplate
*/ */
public interface MessageRequestReplyOperations<D> { public interface MessageRequestReplyOperations<D> {
/**
* Send a request message and receive the reply from a default destination.
*
* @param requestMessage the message to send
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
*/
Message<?> sendAndReceive(Message<?> requestMessage) throws MessagingException; Message<?> sendAndReceive(Message<?> requestMessage) throws MessagingException;
/**
* Send a request message and receive the reply from the given destination.
*
* @param destination the target destination
* @param requestMessage the message to send
* @return the reply, possibly {@code null} if the message could not be received,
* for example due to a timeout
*/
Message<?> sendAndReceive(D destination, Message<?> requestMessage) throws MessagingException; Message<?> sendAndReceive(D destination, Message<?> requestMessage) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter}, send
* it as a {@link Message} to a default destination, receive the reply and convert
* its body of the specified target class.
*
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(Object request, Class<T> targetClass) throws MessagingException; <T> T convertSendAndReceive(Object request, Class<T> targetClass) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter}, send
* it as a {@link Message} to the given destination, receive the reply and convert
* its body of the specified target class.
*
* @param destination the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass) throws MessagingException; <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter}, send
* it as a {@link Message} with the given headers, to the specified destination,
* receive the reply and convert its body of the specified target class.
*
* @param destination the target destination
* @param request payload for the request message to send
* @param headers headers for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, Class<T> targetClass) <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, Class<T> targetClass)
throws MessagingException; throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* apply the given post processor and send the resulting {@link Message} to a
* default destination, receive the reply and convert its body of the given
* target class.
*
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @param requestPostProcessor post process to apply to the request message
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor requestPostProcessor) <T> T convertSendAndReceive(Object request, Class<T> targetClass, MessagePostProcessor requestPostProcessor)
throws MessagingException; throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* apply the given post processor and send the resulting {@link Message} to the
* given destination, receive the reply and convert its body of the given
* target class.
*
* @param destination the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @param requestPostProcessor post process to apply to the request message
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass, <T> T convertSendAndReceive(D destination, Object request, Class<T> targetClass,
MessagePostProcessor requestPostProcessor) throws MessagingException; MessagePostProcessor requestPostProcessor) throws MessagingException;
/**
* Convert the given request Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers, apply the given post processor
* and send the resulting {@link Message} to the specified destination, receive
* the reply and convert its body of the given target class.
*
* @param destination the target destination
* @param request payload for the request message to send
* @param targetClass the target type to convert the payload of the reply to
* @param requestPostProcessor post process to apply to the request message
* @return the payload of the reply message, possibly {@code null} if the message
* could not be received, for example due to a timeout
*/
<T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers, <T> T convertSendAndReceive(D destination, Object request, Map<String, Object> headers,
Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException; Class<T> targetClass, MessagePostProcessor requestPostProcessor) throws MessagingException;
......
...@@ -21,9 +21,9 @@ import org.springframework.messaging.Message; ...@@ -21,9 +21,9 @@ import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException; import org.springframework.messaging.MessagingException;
/** /**
* A set of operations sending messages to a destination. * Operations for sending messages to a destination.
* *
* @param <D> the type of destination to which messages can be sent * @param <D> the type of destination to send messages to
* *
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
...@@ -31,22 +31,87 @@ import org.springframework.messaging.MessagingException; ...@@ -31,22 +31,87 @@ import org.springframework.messaging.MessagingException;
*/ */
public interface MessageSendingOperations<D> { public interface MessageSendingOperations<D> {
/**
* Send a message to a default destination.
*
* @param message the message to send
*/
void send(Message<?> message) throws MessagingException; void send(Message<?> message) throws MessagingException;
/**
* Send a message to the given destination.
*
* @param destination the target destination
* @param message the message to send
*/
void send(D destination, Message<?> message) throws MessagingException; void send(D destination, Message<?> message) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message and send it to a default destination.
*
* @param payload the Object to use as payload
*/
void convertAndSend(Object payload) throws MessagingException; void convertAndSend(Object payload) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message and send it to the given destination.
*
* @param destination the target destination
* @param payload the Object to use as payload
*/
void convertAndSend(D destination, Object payload) throws MessagingException; void convertAndSend(D destination, Object payload) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers and send it to
* a default destination.
*
* @param destination the target destination
* @param payload the Object to use as payload
* @param headers headers for the message to send
*/
void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException; void convertAndSend(D destination, Object payload, Map<String, Object> headers) throws MessagingException;
/**
* Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message, apply the given post processor, and send
* the resulting message to a default destination.
*
* @param payload the Object to use as payload
* @param postProcessor the post processor to apply to the message
*/
void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException; void convertAndSend(Object payload, MessagePostProcessor postProcessor) throws MessagingException;
void convertAndSend(D destination, Object payload, /**
MessagePostProcessor postProcessor) throws MessagingException; * Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message, apply the given post processor, and send
* the resulting message to the given destination.
*
* @param destination the target destination
* @param payload the Object to use as payload
* @param postProcessor the post processor to apply to the message
*/
void convertAndSend(D destination, Object payload, MessagePostProcessor postProcessor) throws MessagingException;
void convertAndSend(D destination, Object payload, Map<String, Object> headers, /**
MessagePostProcessor postProcessor) throws MessagingException; * Convert the given Object to serialized form, possibly using a
* {@link org.springframework.messaging.support.converter.MessageConverter},
* wrap it as a message with the given headers, apply the given post processor,
* and send the resulting message to the given destination.
*
* @param destination the target destination
* @param payload the Object to use as payload
* @param headers headers for the message to send
* @param postProcessor the post processor to apply to the message
*/
void convertAndSend(D destination, Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor)
throws MessagingException;
} }
/** /**
* Provides core messaging classes. * Defines interfaces and implementation classes for messaging templates.
*/ */
package org.springframework.messaging.core; package org.springframework.messaging.core;
\ No newline at end of file
...@@ -20,6 +20,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory; ...@@ -20,6 +20,7 @@ import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.ConversionService;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
/** /**
......
...@@ -21,6 +21,7 @@ import java.util.Map; ...@@ -21,6 +21,7 @@ import java.util.Map;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.convert.ConversionService; import org.springframework.core.convert.ConversionService;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.handler.annotation.PathVariable; import org.springframework.messaging.handler.annotation.PathVariable;
import org.springframework.messaging.handler.annotation.ValueConstants; import org.springframework.messaging.handler.annotation.ValueConstants;
......
...@@ -19,12 +19,12 @@ package org.springframework.messaging.support; ...@@ -19,12 +19,12 @@ package org.springframework.messaging.support;
import java.util.Map; import java.util.Map;
/** /**
* A message implementation that accepts a {@link Throwable} payload. * A {@link GenericMessage} with a {@link Throwable} payload.
* Once created this object is immutable.
* *
* @author Mark Fisher * @author Mark Fisher
* @author Oleg Zhurakousky * @author Oleg Zhurakousky
* @since 4.0 * @since 4.0
*
* @see MessageBuilder * @see MessageBuilder
*/ */
public class ErrorMessage extends GenericMessage<Throwable> { public class ErrorMessage extends GenericMessage<Throwable> {
...@@ -32,10 +32,21 @@ public class ErrorMessage extends GenericMessage<Throwable> { ...@@ -32,10 +32,21 @@ public class ErrorMessage extends GenericMessage<Throwable> {
private static final long serialVersionUID = -5470210965279837728L; private static final long serialVersionUID = -5470210965279837728L;
/**
* Create a new message with the given payload.
*
* @param payload the message payload, never {@code null}
*/
public ErrorMessage(Throwable payload) { public ErrorMessage(Throwable payload) {
super(payload); super(payload);
} }
/**
* Create a new message with the given payload and headers.
*
* @param payload the message payload, never {@code null}
* @param headers message headers
*/
public ErrorMessage(Throwable payload, Map<String, Object> headers) { public ErrorMessage(Throwable payload, Map<String, Object> headers) {
super(payload, headers); super(payload, headers);
} }
......
...@@ -26,11 +26,12 @@ import org.springframework.util.Assert; ...@@ -26,11 +26,12 @@ import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
/** /**
* Base Message class defining common properties such as id, payload, and headers. * An implementation of {@link Message} with a generic payload.
* Once created this object is immutable. * Once created, a GenericMessage is immutable.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
*
* @see MessageBuilder * @see MessageBuilder
*/ */
public class GenericMessage<T> implements Message<T>, Serializable { public class GenericMessage<T> implements Message<T>, Serializable {
...@@ -45,18 +46,18 @@ public class GenericMessage<T> implements Message<T>, Serializable { ...@@ -45,18 +46,18 @@ public class GenericMessage<T> implements Message<T>, Serializable {
/** /**
* Create a new message with the given payload. * Create a new message with the given payload.
* @param payload the message payload *
* @param payload the message payload, never {@code null}
*/ */
public GenericMessage(T payload) { public GenericMessage(T payload) {
this(payload, null); this(payload, null);
} }
/** /**
* Create a new message with the given payload. The provided map will be used to * Create a new message with the given payload and headers.
* populate the message headers *
* @param payload the message payload * @param payload the message payload, never {@code null}
* @param headers message headers * @param headers message headers
* @see MessageHeaders
*/ */
public GenericMessage(T payload, Map<String, Object> headers) { public GenericMessage(T payload, Map<String, Object> headers) {
Assert.notNull(payload, "payload must not be null"); Assert.notNull(payload, "payload must not be null");
...@@ -88,7 +89,7 @@ public class GenericMessage<T> implements Message<T>, Serializable { ...@@ -88,7 +89,7 @@ public class GenericMessage<T> implements Message<T>, Serializable {
sb.append("[Payload ").append(this.payload.getClass().getSimpleName()); sb.append("[Payload ").append(this.payload.getClass().getSimpleName());
sb.append(" content=").append(this.payload).append("]"); sb.append(" content=").append(this.payload).append("]");
} }
sb.append("[Headers=" + this.headers + "]"); sb.append("[Headers=").append(this.headers).append("]");
return sb.toString(); return sb.toString();
} }
...@@ -102,11 +103,8 @@ public class GenericMessage<T> implements Message<T>, Serializable { ...@@ -102,11 +103,8 @@ public class GenericMessage<T> implements Message<T>, Serializable {
} }
if (obj != null && obj instanceof GenericMessage<?>) { if (obj != null && obj instanceof GenericMessage<?>) {
GenericMessage<?> other = (GenericMessage<?>) obj; GenericMessage<?> other = (GenericMessage<?>) obj;
if (!this.headers.getId().equals(other.headers.getId())) { return (this.headers.getId().equals(other.headers.getId()) &&
return false; this.headers.equals(other.headers) && this.payload.equals(other.payload));
}
return this.headers.equals(other.headers)
&& this.payload.equals(other.payload);
} }
return false; return false;
} }
......
...@@ -23,12 +23,14 @@ import org.springframework.messaging.MessageChannel; ...@@ -23,12 +23,14 @@ import org.springframework.messaging.MessageChannel;
import org.springframework.util.Assert; import org.springframework.util.Assert;
/** /**
* A builder for creating {@link GenericMessage} or {@link ErrorMessage} if the payload is * A builder for creating a {@link GenericMessage} (or {@link ErrorMessage} if
* {@link Throwable}. * the payload is of type {@link Throwable}).
* *
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Mark Fisher * @author Mark Fisher
* @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
*
* @see GenericMessage * @see GenericMessage
* @see ErrorMessage * @see ErrorMessage
*/ */
...@@ -56,6 +58,7 @@ public final class MessageBuilder<T> { ...@@ -56,6 +58,7 @@ public final class MessageBuilder<T> {
* Create a builder for a new {@link Message} instance pre-populated with all of the * Create a builder for a new {@link Message} instance pre-populated with all of the
* headers copied from the provided message. The payload of the provided Message will * headers copied from the provided message. The payload of the provided Message will
* also be used as the payload for the new message. * also be used as the payload for the new message.
*
* @param message the Message from which the payload and all headers will be copied * @param message the Message from which the payload and all headers will be copied
*/ */
public static <T> MessageBuilder<T> fromMessage(Message<T> message) { public static <T> MessageBuilder<T> fromMessage(Message<T> message) {
......
...@@ -27,9 +27,19 @@ import org.springframework.util.MultiValueMap; ...@@ -27,9 +27,19 @@ import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
/** /**
* An extension of {@link MessageHeaderAccessor} that also provides read/write access to * An extension of {@link MessageHeaderAccessor} that also stores and provides read/write
* message headers from an external message source. Native message headers are kept * access to message headers from an external source -- e.g. a Spring {@link Message}
* in a {@link MultiValueMap} under the key {@link #NATIVE_HEADERS}. * created to represent a STOMP message received from a STOMP client or message broker.
* Native message headers are kept in a {@link MultiValueMap} under the key
* {@link #NATIVE_HEADERS}.
* <p>
* This class is not intended for direct use but is rather expected to be consumed
* through sub-classes such as
* {@link org.springframework.messaging.simp.stomp.StompHeaderAccessor StompHeaderAccessor}.
* Such sub-classes may provide factory methods to translate message headers from
* an external messaging source (e.g. STOMP) to Spring {@link Message} headers and
* reversely to translate Spring {@link Message} headers to a message to send to an
* external source.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
...@@ -72,14 +82,6 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { ...@@ -72,14 +82,6 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
return null; return null;
} }
/**
* Create {@link NativeMessageHeaderAccessor} from the headers of an existing message.
*/
public static NativeMessageHeaderAccessor wrap(Message<?> message) {
return new NativeMessageHeaderAccessor(message);
}
@Override @Override
public Map<String, Object> toMap() { public Map<String, Object> toMap() {
Map<String, Object> result = super.toMap(); Map<String, Object> result = super.toMap();
......
/** /**
* Provides classes representing various channel types. * Provides {@link org.springframework.messaging.MessageChannel} implementations
* classes as well as channel interceptor support.
*/ */
package org.springframework.messaging.support.channel; package org.springframework.messaging.support.channel;
\ No newline at end of file
/** /**
* Provides classes supporting message conversion. * Provides support for message conversion.
*/ */
package org.springframework.messaging.support.converter; package org.springframework.messaging.support.converter;
\ No newline at end of file
/**
* Provides implementations of {@link org.springframework.messaging.Message} along with
* a MessageBuilder and MessageHeaderAccessor for building and working with messages
* and message headers.
*/
package org.springframework.messaging.support;
\ No newline at end of file
<html> <html>
<body> <body>
<p> <p>
Spring's support for messaging architectures and messaging protocols. Spring Framework's support for messaging architectures and protocols.
</p> </p>
</body> </body>
</html> </html>
\ No newline at end of file
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
/**
* Unit tests for {@link AbstractDestinationResolvingMessagingTemplate}.
*
* @author Rossen Stoyanchev
*/
public class DestinationResolvingMessagingTemplateTests {
private TestDestinationResolvingMessagingTemplate template;
private ExecutorSubscribableChannel myChannel;
private Map<String, Object> headers;
private TestMessagePostProcessor postProcessor;
@Before
public void setup() {
TestMessageChannelDestinationResolver resolver = new TestMessageChannelDestinationResolver();
this.myChannel = new ExecutorSubscribableChannel();
resolver.registerMessageChannel("myChannel", this.myChannel);
this.template = new TestDestinationResolvingMessagingTemplate();
this.template.setDestinationResolver(resolver);
this.headers = Collections.<String, Object>singletonMap("key", "value");
this.postProcessor = new TestMessagePostProcessor();
}
@Test
public void send() {
Message<?> message = new GenericMessage<Object>("payload");
this.template.send("myChannel", message);
assertSame(this.myChannel, this.template.messageChannel);
assertSame(message, this.template.message);
}
@Test(expected = IllegalStateException.class)
public void sendNoDestinationResolver() {
TestDestinationResolvingMessagingTemplate template = new TestDestinationResolvingMessagingTemplate();
template.send("myChannel", new GenericMessage<Object>("payload"));
}
@Test
public void convertAndSendPayload() {
this.template.convertAndSend("myChannel", "payload");
assertSame(this.myChannel, this.template.messageChannel);
assertNotNull(this.template.message);
assertSame("payload", this.template.message.getPayload());
}
@Test
public void convertAndSendPayloadAndHeaders() {
this.template.convertAndSend("myChannel", "payload", this.headers);
assertSame(this.myChannel, this.template.messageChannel);
assertNotNull(this.template.message);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("payload", this.template.message.getPayload());
}
@Test
public void convertAndSendPayloadWithPostProcessor() {
this.template.convertAndSend("myChannel", "payload", this.postProcessor);
assertSame(this.myChannel, this.template.messageChannel);
assertNotNull(this.template.message);
assertEquals("payload", this.template.message.getPayload());
assertNotNull(this.postProcessor.getMessage());
assertSame(this.postProcessor.getMessage(), this.template.message);
}
@Test
public void convertAndSendPayloadAndHeadersWithPostProcessor() {
this.template.convertAndSend("myChannel", "payload", this.headers, this.postProcessor);
assertSame(this.myChannel, this.template.messageChannel);
assertNotNull(this.template.message);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("payload", this.template.message.getPayload());
assertNotNull(this.postProcessor.getMessage());
assertSame(this.postProcessor.getMessage(), this.template.message);
}
@Test
public void receive() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setReceiveMessage(expected);
Message<?> actual = this.template.receive("myChannel");
assertSame(expected, actual);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void receiveAndConvert() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setReceiveMessage(expected);
String payload = this.template.receiveAndConvert("myChannel", String.class);
assertEquals("payload", payload);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void sendAndReceive() {
Message<?> requestMessage = new GenericMessage<Object>("request");
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
Message<?> actual = this.template.sendAndReceive("myChannel", requestMessage);
assertEquals(requestMessage, this.template.message);
assertSame(responseMessage, actual);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void convertSendAndReceive() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String actual = this.template.convertSendAndReceive("myChannel", "request", String.class);
assertEquals("request", this.template.message.getPayload());
assertSame("response", actual);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void convertSendAndReceiveWithHeaders() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String actual = this.template.convertSendAndReceive("myChannel", "request", this.headers, String.class);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("request", this.template.message.getPayload());
assertSame("response", actual);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void convertSendAndReceiveWithPostProcessor() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String actual = this.template.convertSendAndReceive("myChannel", "request", String.class, this.postProcessor);
assertEquals("request", this.template.message.getPayload());
assertSame("request", this.postProcessor.getMessage().getPayload());
assertSame("response", actual);
assertSame(this.myChannel, this.template.messageChannel);
}
@Test
public void convertSendAndReceiveWithHeadersAndPostProcessor() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String actual = this.template.convertSendAndReceive("myChannel", "request", this.headers,
String.class, this.postProcessor);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("request", this.template.message.getPayload());
assertSame("request", this.postProcessor.getMessage().getPayload());
assertSame("response", actual);
assertSame(this.myChannel, this.template.messageChannel);
}
private static class TestDestinationResolvingMessagingTemplate
extends AbstractDestinationResolvingMessagingTemplate<MessageChannel> {
private MessageChannel messageChannel;
private Message<?> message;
private Message<?> receiveMessage;
private void setReceiveMessage(Message<?> receiveMessage) {
this.receiveMessage = receiveMessage;
}
@Override
protected void doSend(MessageChannel channel, Message<?> message) {
this.messageChannel = channel;
this.message = message;
}
@Override
protected Message<?> doReceive(MessageChannel channel) {
this.messageChannel = channel;
return this.receiveMessage;
}
@Override
protected Message<?> doSendAndReceive(MessageChannel channel, Message<?> requestMessage) {
this.message = requestMessage;
this.messageChannel = channel;
return this.receiveMessage;
}
}
}
class TestMessageChannelDestinationResolver implements DestinationResolver<MessageChannel> {
private final Map<String, MessageChannel> channels = new HashMap<>();
public void registerMessageChannel(String name, MessageChannel channel) {
this.channels.put(name, channel);
}
@Override
public MessageChannel resolveDestination(String name) throws DestinationResolutionException {
return this.channels.get(name);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.*;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.channel.ExecutorSubscribableChannel;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/**
* Unit tests for {@link GenericMessagingTemplate}.
*
* @author Rossen Stoyanchev
*/
public class GenericMessagingTemplateTests {
private GenericMessagingTemplate template;
private ThreadPoolTaskExecutor executor;
@Before
public void setup() {
this.template = new GenericMessagingTemplate();
this.executor = new ThreadPoolTaskExecutor();
this.executor.afterPropertiesSet();
}
@Test
public void sendAndReceive() {
SubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<String>("response"));
}
});
String actual = this.template.convertSendAndReceive(channel, "request", String.class);
assertEquals("response", actual);
}
@Test
public void sendAndReceiveTimeout() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
this.template.setReceiveTimeout(1);
this.template.setThrowExceptionOnLateReply(true);
SubscribableChannel channel = new ExecutorSubscribableChannel(this.executor);
channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
Thread.sleep(500);
MessageChannel replyChannel = (MessageChannel) message.getHeaders().getReplyChannel();
replyChannel.send(new GenericMessage<String>("response"));
fail("Expected exception");
}
catch (InterruptedException e) {
fail("Unexpected exception " + e.getMessage());
}
catch (MessageDeliveryException ex) {
assertEquals("Reply message received but the receiving thread has already received a reply",
ex.getMessage());
}
finally {
latch.countDown();
}
}
});
assertNull(this.template.convertSendAndReceive(channel, "request", String.class));
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Unit tests for receiving operations in {@link AbstractMessagingTemplate}.
*
* @author Rossen Stoyanchev
*
* @see MessageRequestReplyTemplateTests
*/
public class MessageReceivingTemplateTests {
private TestMessagingTemplate template;
@Before
public void setup() {
this.template = new TestMessagingTemplate();
}
@Test
public void receive() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(expected);
Message<?> actual = this.template.receive();
assertEquals("home", this.template.destination);
assertSame(expected, actual);
}
@Test(expected = IllegalStateException.class)
public void receiveMissingDefaultDestination() {
this.template.receive();
}
@Test
public void receiveFromDestination() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setReceiveMessage(expected);
Message<?> actual = this.template.receive("somewhere");
assertEquals("somewhere", this.template.destination);
assertSame(expected, actual);
}
@Test
public void receiveAndConvert() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(expected);
String payload = this.template.receiveAndConvert(String.class);
assertEquals("home", this.template.destination);
assertSame("payload", payload);
}
@Test
public void receiveAndConvertFromDestination() {
Message<?> expected = new GenericMessage<Object>("payload");
this.template.setReceiveMessage(expected);
String payload = this.template.receiveAndConvert("somewhere", String.class);
assertEquals("somewhere", this.template.destination);
assertSame("payload", payload);
}
private static class TestMessagingTemplate extends AbstractMessagingTemplate<String> {
private String destination;
private Message<?> receiveMessage;
private void setReceiveMessage(Message<?> receiveMessage) {
this.receiveMessage = receiveMessage;
}
@Override
protected void doSend(String destination, Message<?> message) {
}
@Override
protected Message<?> doReceive(String destination) {
this.destination = destination;
return this.receiveMessage;
}
@Override
protected Message<?> doSendAndReceive(String destination, Message<?> requestMessage) {
this.destination = destination;
return null;
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.util.Collections;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Unit tests for request and reply operations in {@link AbstractMessagingTemplate}.
*
* @author Rossen Stoyanchev
*
* @see MessageReceivingTemplateTests
*/
public class MessageRequestReplyTemplateTests {
private TestMessagingTemplate template;
private TestMessagePostProcessor postProcessor;
private Map<String, Object> headers;
@Before
public void setup() {
this.template = new TestMessagingTemplate();
this.postProcessor = new TestMessagePostProcessor();
this.headers = Collections.<String, Object>singletonMap("key", "value");
}
@Test
public void sendAndReceive() {
Message<?> requestMessage = new GenericMessage<Object>("request");
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(responseMessage);
Message<?> actual = this.template.sendAndReceive(requestMessage);
assertEquals("home", this.template.destination);
assertSame(requestMessage, this.template.requestMessage);
assertSame(responseMessage, actual);
}
@Test(expected = IllegalStateException.class)
public void sendAndReceiveMissingDestination() {
this.template.sendAndReceive(new GenericMessage<Object>("request"));
}
@Test
public void sendAndReceiveToDestination() {
Message<?> requestMessage = new GenericMessage<Object>("request");
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
Message<?> actual = this.template.sendAndReceive("somewhere", requestMessage);
assertEquals("somewhere", this.template.destination);
assertSame(requestMessage, this.template.requestMessage);
assertSame(responseMessage, actual);
}
@Test
public void convertAndSend() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("request", String.class);
assertEquals("home", this.template.destination);
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
}
@Test
public void convertAndSendToDestination() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("somewhere", "request", String.class);
assertEquals("somewhere", this.template.destination);
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
}
@Test
public void convertAndSendToDestinationWithHeaders() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("somewhere", "request", this.headers, String.class);
assertEquals("somewhere", this.template.destination);
assertEquals("value", this.template.requestMessage.getHeaders().get("key"));
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
}
@Test
public void convertAndSendWithPostProcessor() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setDefaultDestination("home");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("request", String.class, this.postProcessor);
assertEquals("home", this.template.destination);
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
assertSame(this.postProcessor.getMessage(), this.template.requestMessage);
}
@Test
public void convertAndSendToDestinationWithPostProcessor() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("somewhere", "request", String.class, this.postProcessor);
assertEquals("somewhere", this.template.destination);
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
assertSame(this.postProcessor.getMessage(), this.template.requestMessage);
}
@Test
public void convertAndSendToDestinationWithHeadersAndPostProcessor() {
Message<?> responseMessage = new GenericMessage<Object>("response");
this.template.setReceiveMessage(responseMessage);
String response = this.template.convertSendAndReceive("somewhere", "request", this.headers,
String.class, this.postProcessor);
assertEquals("somewhere", this.template.destination);
assertEquals("value", this.template.requestMessage.getHeaders().get("key"));
assertSame("request", this.template.requestMessage.getPayload());
assertSame("response", response);
assertSame(this.postProcessor.getMessage(), this.template.requestMessage);
}
private static class TestMessagingTemplate extends AbstractMessagingTemplate<String> {
private String destination;
private Message<?> requestMessage;
private Message<?> receiveMessage;
private void setReceiveMessage(Message<?> receiveMessage) {
this.receiveMessage = receiveMessage;
}
@Override
protected void doSend(String destination, Message<?> message) {
}
@Override
protected Message<?> doReceive(String destination) {
this.destination = destination;
return this.receiveMessage;
}
@Override
protected Message<?> doSendAndReceive(String destination, Message<?> requestMessage) {
this.destination = destination;
this.requestMessage = requestMessage;
return this.receiveMessage;
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.junit.Before;
import org.junit.Test;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;
import java.util.Collections;
import java.util.Map;
import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
/**
* Unit tests for {@link AbstractMessageSendingTemplate}.
*
* @author Rossen Stoyanchev
*/
public class MessageSendingTemplateTests {
private TestMessageSendingTemplate template;
private TestMessagePostProcessor postProcessor;
private Map<String, Object> headers;
@Before
public void setup() {
this.template = new TestMessageSendingTemplate();
this.postProcessor = new TestMessagePostProcessor();
this.headers = Collections.<String, Object>singletonMap("key", "value");
}
@Test
public void send() {
Message<?> message = new GenericMessage<Object>("payload");
this.template.setDefaultDestination("home");
this.template.send(message);
assertEquals("home", this.template.destination);
assertSame(message, this.template.message);
}
@Test
public void sendToDestination() {
Message<?> message = new GenericMessage<Object>("payload");
this.template.send("somewhere", message);
assertEquals("somewhere", this.template.destination);
assertSame(message, this.template.message);
}
@Test(expected = IllegalStateException.class)
public void sendMissingDestination() {
Message<?> message = new GenericMessage<Object>("payload");
this.template.send(message);
}
@Test
public void convertAndSend() {
this.template.convertAndSend("somewhere", "payload", headers, this.postProcessor);
assertEquals("somewhere", this.template.destination);
assertNotNull(this.template.message);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("payload", this.template.message.getPayload());
assertNotNull(this.postProcessor.getMessage());
assertSame(this.template.message, this.postProcessor.getMessage());
}
@Test
public void convertAndSendPayload() {
this.template.setDefaultDestination("home");
this.template.convertAndSend("payload");
assertEquals("home", this.template.destination);
assertNotNull(this.template.message);
assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size());
assertEquals("payload", this.template.message.getPayload());
}
@Test
public void convertAndSendPayloadToDestination() {
this.template.convertAndSend("somewhere", "payload");
assertEquals("somewhere", this.template.destination);
assertNotNull(this.template.message);
assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size());
assertEquals("payload", this.template.message.getPayload());
}
@Test
public void convertAndSendPayloadAndHeadersToDestination() {
this.template.convertAndSend("somewhere", "payload", headers);
assertEquals("somewhere", this.template.destination);
assertNotNull(this.template.message);
assertEquals("value", this.template.message.getHeaders().get("key"));
assertEquals("payload", this.template.message.getPayload());
}
@Test
public void convertAndSendPayloadWithPostProcessor() {
this.template.setDefaultDestination("home");
this.template.convertAndSend((Object) "payload", this.postProcessor);
assertEquals("home", this.template.destination);
assertNotNull(this.template.message);
assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size());
assertEquals("payload", this.template.message.getPayload());
assertNotNull(this.postProcessor.getMessage());
assertSame(this.template.message, this.postProcessor.getMessage());
}
@Test
public void convertAndSendPayloadWithPostProcessorToDestination() {
this.template.convertAndSend("somewhere", "payload", this.postProcessor);
assertEquals("somewhere", this.template.destination);
assertNotNull(this.template.message);
assertEquals("expected 'id' and 'timestamp' headers only", 2, this.template.message.getHeaders().size());
assertEquals("payload", this.template.message.getPayload());
assertNotNull(this.postProcessor.getMessage());
assertSame(this.template.message, this.postProcessor.getMessage());
}
private static class TestMessageSendingTemplate extends AbstractMessageSendingTemplate<String> {
private String destination;
private Message<?> message;
@Override
protected void doSend(String destination, Message<?> message) {
this.destination = destination;
this.message = message;
}
}
}
class TestMessagePostProcessor implements MessagePostProcessor {
private Message<?> message;
Message<?> getMessage() {
return this.message;
}
@Override
public Message<?> postProcessMessage(Message<?> message) {
this.message = message;
return message;
}
}
...@@ -26,6 +26,7 @@ import org.springframework.core.GenericTypeResolver; ...@@ -26,6 +26,7 @@ import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
......
...@@ -27,6 +27,7 @@ import org.springframework.core.GenericTypeResolver; ...@@ -27,6 +27,7 @@ import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter; import org.springframework.core.MethodParameter;
import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.handler.annotation.PathVariable; import org.springframework.messaging.handler.annotation.PathVariable;
import org.springframework.messaging.support.MessageBuilder; import org.springframework.messaging.support.MessageBuilder;
......
...@@ -44,7 +44,7 @@ public class MessageHeaderAccessorTests { ...@@ -44,7 +44,7 @@ public class MessageHeaderAccessorTests {
Map<String, Object> original = new HashMap<>(); Map<String, Object> original = new HashMap<>();
original.put("foo", "bar"); original.put("foo", "bar");
original.put("bar", "baz"); original.put("bar", "baz");
GenericMessage<String> message = new GenericMessage<>("p", original); GenericMessage<String> message = new GenericMessage<>("payload", original);
MessageHeaderAccessor headers = new MessageHeaderAccessor(message); MessageHeaderAccessor headers = new MessageHeaderAccessor(message);
Map<String, Object> actual = headers.toMap(); Map<String, Object> actual = headers.toMap();
...@@ -61,7 +61,7 @@ public class MessageHeaderAccessorTests { ...@@ -61,7 +61,7 @@ public class MessageHeaderAccessorTests {
Map<String, Object> original = new HashMap<>(); Map<String, Object> original = new HashMap<>();
original.put("foo", "bar"); original.put("foo", "bar");
original.put("bar", "baz"); original.put("bar", "baz");
GenericMessage<String> message = new GenericMessage<>("p", original); GenericMessage<String> message = new GenericMessage<>("payload", original);
MessageHeaderAccessor headers = new MessageHeaderAccessor(message); MessageHeaderAccessor headers = new MessageHeaderAccessor(message);
headers.setHeader("foo", "BAR"); headers.setHeader("foo", "BAR");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册