提交 7a85827c 编写于 作者: P Phillip Webb 提交者: Rossen Stoyanchev

Polish messaging code

Minor formatting and Javadoc changes.
上级 0a68c993
...@@ -22,11 +22,18 @@ package org.springframework.messaging; ...@@ -22,11 +22,18 @@ package org.springframework.messaging;
* @author Mark Fisher * @author Mark Fisher
* @author Arjen Poutsma * @author Arjen Poutsma
* @since 4.0 * @since 4.0
* @see org.springframework.messaging.support.MessageBuilder
*/ */
public interface Message<T> { public interface Message<T> {
/**
* Returns message headers for the message (never {@code null}).
*/
MessageHeaders getHeaders(); MessageHeaders getHeaders();
/**
* Returns the message payload.
*/
T getPayload(); T getPayload();
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Base channel interface defining common behavior for sending messages. * Base channel interface defining common behavior for sending messages.
* *
...@@ -25,31 +24,32 @@ package org.springframework.messaging; ...@@ -25,31 +24,32 @@ package org.springframework.messaging;
*/ */
public interface MessageChannel { public interface MessageChannel {
/**
* Constant for sending a message without a prescribed timeout.
*/
public static final long INDEFINITE_TIMEOUT = -1;
/** /**
* Send a {@link Message} to this channel. May throw a RuntimeException for * Send a {@link Message} to this channel. May throw a RuntimeException for
* non-recoverable errors. Otherwise, if the Message cannot be sent for a * non-recoverable errors. Otherwise, if the Message cannot be sent for a non-fatal
* non-fatal reason this method will return 'false', and if the Message is * reason this method will return 'false', and if the Message is sent successfully, it
* sent successfully, it will return 'true'. * will return 'true'.
*
* <p>Depending on the implementation, this method may block indefinitely.
* To provide a maximum wait time, use {@link #send(Message, long)}.
* *
* <p>Depending on the implementation, this method may block indefinitely. To provide a
* maximum wait time, use {@link #send(Message, long)}.
* @param message the {@link Message} to send * @param message the {@link Message} to send
*
* @return whether or not the Message has been sent successfully * @return whether or not the Message has been sent successfully
*/ */
boolean send(Message<?> message); boolean send(Message<?> message);
/** /**
* Send a message, blocking until either the message is accepted or the * Send a message, blocking until either the message is accepted or the specified
* specified timeout period elapses. * timeout period elapses.
*
* @param message the {@link Message} to send * @param message the {@link Message} to send
* @param timeout the timeout in milliseconds * @param timeout the timeout in milliseconds or #INDEFINITE_TIMEOUT
* * @return {@code true} if the message is sent successfully, {@code false} if the
* @return <code>true</code> if the message is sent successfully, * specified timeout period elapses or the send is interrupted
* <code>false</code> if the specified timeout period elapses or
* the send is interrupted
*/ */
boolean send(Message<?> message, long timeout); boolean send(Message<?> message, long timeout);
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Base interface for any component that handles Messages. * Base interface for any component that handles Messages.
* *
...@@ -27,25 +26,27 @@ package org.springframework.messaging; ...@@ -27,25 +26,27 @@ package org.springframework.messaging;
public interface MessageHandler { public interface MessageHandler {
/** /**
* TODO: exceptions
*
* Handles the message if possible. If the handler cannot deal with the * Handles the message if possible. If the handler cannot deal with the
* message this will result in a <code>MessageRejectedException</code> e.g. * message this will result in a {@code MessageRejectedException} e.g.
* in case of a Selective Consumer. When a consumer tries to handle a * in case of a Selective Consumer. When a consumer tries to handle a
* message, but fails to do so, a <code>MessageHandlingException</code> is * message, but fails to do so, a {@code MessageHandlingException} is
* thrown. In the last case it is recommended to treat the message as tainted * thrown. In the last case it is recommended to treat the message as tainted
* and go into an error scenario. * and go into an error scenario.
* <p> * <p>
* When the handling results in a failure of another message being sent * When the handling results in a failure of another message being sent
* (e.g. a "reply" message), that failure will trigger a * (e.g. a "reply" message), that failure will trigger a
* <code>MessageDeliveryException</code>. * {@code MessageDeliveryException}.
* *
* @param message the message to be handled * @param message the message to be handled
* reply related to the handling of the message
*/
void handleMessage(Message<?> message) throws MessagingException;
/*
* TODO: exceptions
* @throws org.springframework.integration.MessageRejectedException if the handler doesn't accept the message * @throws org.springframework.integration.MessageRejectedException if the handler doesn't accept the message
* @throws org.springframework.integration.MessageHandlingException when something fails during the handling * @throws org.springframework.integration.MessageHandlingException when something fails during the handling
* @throws org.springframework.integration.MessageDeliveryException when this handler failed to deliver the * @throws org.springframework.integration.MessageDeliveryException when this handler failed to deliver the
* reply related to the handling of the message
*/ */
void handleMessage(Message<?> message) throws MessagingException;
} }
...@@ -34,16 +34,18 @@ import org.apache.commons.logging.Log; ...@@ -34,16 +34,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
* The headers for a {@link Message}.<br> * The headers for a {@link Message}
* IMPORTANT: This class is immutable. Any mutating operation (e.g., put(..), putAll(..) etc.)
* will throw {@link UnsupportedOperationException}
* *
* <p>To create MessageHeaders instance use fluent MessageBuilder API * <p><b>IMPORTANT</b>: This class is immutable. Any mutating operation
* <pre> * (e.g., put(..), putAll(..) etc.) will throw {@link UnsupportedOperationException}.
*
* <p>To create MessageHeaders instance use fluent
* {@link org.springframework.messaging.support.MessageBuilder MessageBuilder} API
* <pre class="code">
* MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2"); * MessageBuilder.withPayload("foo").setHeader("key1", "value1").setHeader("key2", "value2");
* </pre> * </pre>
* or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map} * or create an instance of GenericMessage passing payload as {@link Object} and headers as a regular {@link Map}
* <pre> * <pre class="code">
* Map headers = new HashMap(); * Map headers = new HashMap();
* headers.put("key1", "value1"); * headers.put("key1", "value1");
* headers.put("key2", "value2"); * headers.put("key2", "value2");
...@@ -54,6 +56,7 @@ import org.apache.commons.logging.LogFactory; ...@@ -54,6 +56,7 @@ import org.apache.commons.logging.LogFactory;
* @author Mark Fisher * @author Mark Fisher
* @author Gary Russell * @author Gary Russell
* @since 4.0 * @since 4.0
* @see org.springframework.messaging.support.MessageBuilder
*/ */
public final class MessageHeaders implements Map<String, Object>, Serializable { public final class MessageHeaders implements Map<String, Object>, Serializable {
...@@ -61,6 +64,7 @@ public final class MessageHeaders implements Map<String, Object>, Serializable { ...@@ -61,6 +64,7 @@ public final class MessageHeaders implements Map<String, Object>, Serializable {
private static final Log logger = LogFactory.getLog(MessageHeaders.class); private static final Log logger = LogFactory.getLog(MessageHeaders.class);
private static volatile IdGenerator idGenerator = null; private static volatile IdGenerator idGenerator = null;
/** /**
...@@ -79,7 +83,6 @@ public final class MessageHeaders implements Map<String, Object>, Serializable { ...@@ -79,7 +83,6 @@ public final class MessageHeaders implements Map<String, Object>, Serializable {
public static final String CONTENT_TYPE = "contentType"; public static final String CONTENT_TYPE = "contentType";
public static final List<String> HEADER_NAMES = Arrays.asList(ID, TIMESTAMP); public static final List<String> HEADER_NAMES = Arrays.asList(ID, TIMESTAMP);
......
...@@ -59,6 +59,7 @@ public class MessagingException extends RuntimeException { ...@@ -59,6 +59,7 @@ public class MessagingException extends RuntimeException {
this.failedMessage = message; this.failedMessage = message;
} }
public Message<?> getFailedMessage() { public Message<?> getFailedMessage() {
return this.failedMessage; return this.failedMessage;
} }
......
...@@ -16,9 +16,9 @@ ...@@ -16,9 +16,9 @@
package org.springframework.messaging; package org.springframework.messaging;
/** /**
* Interface for Message Channels from which Messages may be actively received through polling. * Interface for Message Channels from which Messages may be actively received through
* polling.
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
...@@ -27,19 +27,17 @@ public interface PollableChannel extends MessageChannel { ...@@ -27,19 +27,17 @@ public interface PollableChannel extends MessageChannel {
/** /**
* Receive a message from this channel, blocking indefinitely if necessary. * Receive a message from this channel, blocking indefinitely if necessary.
* * @return the next available {@link Message} or {@code null} if interrupted
* @return the next available {@link Message} or <code>null</code> if interrupted
*/ */
Message<?> receive(); Message<?> receive();
/** /**
* Receive a message from this channel, blocking until either a message is * Receive a message from this channel, blocking until either a message is available
* available or the specified timeout period elapses. * or the specified timeout period elapses.
* * @param timeout the timeout in milliseconds or
* @param timeout the timeout in milliseconds * {@link MessageChannel#INDEFINITE_TIMEOUT}.
* * @return the next available {@link Message} or {@code null} if the specified timeout
* @return the next available {@link Message} or <code>null</code> if the * period elapses or the message reception is interrupted
* specified timeout period elapses or the message reception is interrupted
*/ */
Message<?> receive(long timeout); Message<?> receive(long timeout);
......
...@@ -29,11 +29,15 @@ public interface SubscribableChannel extends MessageChannel { ...@@ -29,11 +29,15 @@ public interface SubscribableChannel extends MessageChannel {
/** /**
* Register a {@link MessageHandler} as a subscriber to this channel. * Register a {@link MessageHandler} as a subscriber to this channel.
* @return {@code true} if the channel was not already subscribed to the specified
* handler
*/ */
boolean subscribe(MessageHandler handler); boolean subscribe(MessageHandler handler);
/** /**
* Remove a {@link MessageHandler} from the subscribers of this channel. * Remove a {@link MessageHandler} from the subscribers of this channel.
* @return {@code true} if the channel was previously subscribed to the specified
* handler
*/ */
boolean unsubscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler);
......
...@@ -22,7 +22,6 @@ import java.lang.annotation.Retention; ...@@ -22,7 +22,6 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy; import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target; import java.lang.annotation.Target;
/** /**
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
* @since 4.0 * @since 4.0
......
...@@ -25,11 +25,13 @@ import java.util.Map; ...@@ -25,11 +25,13 @@ import java.util.Map;
* @author Mark Fisher * @author Mark Fisher
* @author Oleg Zhurakousky * @author Oleg Zhurakousky
* @since 4.0 * @since 4.0
* @see MessageBuilder
*/ */
public class ErrorMessage extends GenericMessage<Throwable> { public class ErrorMessage extends GenericMessage<Throwable> {
private static final long serialVersionUID = -5470210965279837728L; private static final long serialVersionUID = -5470210965279837728L;
public ErrorMessage(Throwable payload) { public ErrorMessage(Throwable payload) {
super(payload); super(payload);
} }
......
...@@ -31,11 +31,13 @@ import org.springframework.util.ObjectUtils; ...@@ -31,11 +31,13 @@ import org.springframework.util.ObjectUtils;
* *
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
* @see MessageBuilder
*/ */
public class GenericMessage<T> implements Message<T>, Serializable { public class GenericMessage<T> implements Message<T>, Serializable {
private static final long serialVersionUID = -9004496725833093406L; private static final long serialVersionUID = -9004496725833093406L;
private final T payload; private final T payload;
private final MessageHeaders headers; private final MessageHeaders headers;
......
...@@ -29,6 +29,8 @@ import org.springframework.util.Assert; ...@@ -29,6 +29,8 @@ import org.springframework.util.Assert;
* @author Arjen Poutsma * @author Arjen Poutsma
* @author Mark Fisher * @author Mark Fisher
* @since 4.0 * @since 4.0
* @see GenericMessage
* @see ErrorMessage
*/ */
public final class MessageBuilder<T> { public final class MessageBuilder<T> {
...@@ -49,6 +51,7 @@ public final class MessageBuilder<T> { ...@@ -49,6 +51,7 @@ public final class MessageBuilder<T> {
this.headerAccessor = new MessageHeaderAccessor(originalMessage); this.headerAccessor = new MessageHeaderAccessor(originalMessage);
} }
/** /**
* Create a builder for a new {@link Message} instance pre-populated with all of the * Create a builder for a new {@link Message} instance pre-populated with all of the
* headers copied from the provided message. The payload of the provided Message will * headers copied from the provided message. The payload of the provided Message will
...@@ -73,8 +76,8 @@ public final class MessageBuilder<T> { ...@@ -73,8 +76,8 @@ public final class MessageBuilder<T> {
} }
/** /**
* Set the value for the given header name. If the provided value is <code>null</code> * Set the value for the given header name. If the provided value is {@code null},
* , the header will be removed. * the header will be removed.
*/ */
public MessageBuilder<T> setHeader(String headerName, Object headerValue) { public MessageBuilder<T> setHeader(String headerName, Object headerValue) {
this.headerAccessor.setHeader(headerName, headerValue); this.headerAccessor.setHeader(headerName, headerValue);
......
...@@ -32,14 +32,13 @@ import org.springframework.util.ObjectUtils; ...@@ -32,14 +32,13 @@ import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils; import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
/** /**
* A base class for read/write access to {@link MessageHeaders}. Supports creation of new * A base class for read/write access to {@link MessageHeaders}. Supports creation of new
* headers or modification of existing message headers. * headers or modification of existing message headers.
* <p> *
* Sub-classes can provide additional typed getters and setters for convenient access to * <p>Sub-classes can provide additional typed getters and setters for convenient access
* specific headers. Getters and setters should delegate to {@link #getHeader(String)} or * to specific headers. Getters and setters should delegate to {@link #getHeader(String)}
* {@link #setHeader(String, Object)} respectively. At the end {@link #toMap()} can be * or {@link #setHeader(String, Object)} respectively. At the end {@link #toMap()} can be
* used to obtain the resulting headers. * used to obtain the resulting headers.
* *
* @author Rossen Stoyanchev * @author Rossen Stoyanchev
......
...@@ -26,7 +26,6 @@ import org.springframework.util.LinkedMultiValueMap; ...@@ -26,7 +26,6 @@ import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
/** /**
* An extension of {@link MessageHeaderAccessor} that also provides read/write access to * An extension of {@link MessageHeaderAccessor} that also provides read/write access to
* message headers from an external message source. Native message headers are kept * message headers from an external message source. Native message headers are kept
...@@ -37,9 +36,9 @@ import org.springframework.util.ObjectUtils; ...@@ -37,9 +36,9 @@ import org.springframework.util.ObjectUtils;
*/ */
public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
public static final String NATIVE_HEADERS = "nativeHeaders"; public static final String NATIVE_HEADERS = "nativeHeaders";
// wrapped native headers // wrapped native headers
private final Map<String, List<String>> originalNativeHeaders; private final Map<String, List<String>> originalNativeHeaders;
...@@ -63,6 +62,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor { ...@@ -63,6 +62,7 @@ public class NativeMessageHeaderAccessor extends MessageHeaderAccessor {
this.originalNativeHeaders = initNativeHeaders(message); this.originalNativeHeaders = initNativeHeaders(message);
} }
private static Map<String, List<String>> initNativeHeaders(Message<?> message) { private static Map<String, List<String>> initNativeHeaders(Message<?> message) {
if (message != null) { if (message != null) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册