提交 078c766b 编写于 作者: R Rossen Stoyanchev

Add new MessagingOperations ifc and class hieararchy

上级 e1080a07
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
/**
* Exception that indicates an error occurred during message delivery.
*
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class MessageDeliveryException extends MessagingException {
public MessageDeliveryException(String description) {
super(description);
}
public MessageDeliveryException(Message<?> undeliveredMessage) {
super(undeliveredMessage);
}
public MessageDeliveryException(Message<?> undeliveredMessage, String description) {
super(undeliveredMessage, description);
}
public MessageDeliveryException(Message<?> undeliveredMessage, String description, Throwable cause) {
super(undeliveredMessage, description, cause);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executor;
import org.springframework.util.Assert;
/**
* A {@link SubscribableChannel} that sends messages to each of its subscribers. For a
* more feature complete implementation consider
* {@code org.springframework.integration.channel.PublishSubscribeChannel} from the
* Spring Integration project.
*
* @author Phillip Webb
*/
public class PublishSubscribeChannel implements SubscribableChannel {
private Executor executor;
private Set<MessageHandler> handlers = new CopyOnWriteArraySet<MessageHandler>();
/**
* Create a new {@link PublishSubscribeChannel} instance where messages will be sent
* in the callers thread.
*/
public PublishSubscribeChannel() {
this(null);
}
/**
* Create a new {@link PublishSubscribeChannel} instance where messages will be sent
* via the specified executor.
* @param executor the executor used to send the message or {@code null} to execute in
* the callers thread.
*/
public PublishSubscribeChannel(Executor executor) {
this.executor = executor;
}
@Override
public boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}
@Override
public boolean send(Message<?> message, long timeout) {
Assert.notNull(message, "Message must not be null");
Assert.notNull(message.getPayload(), "Message payload must not be null");
for (final MessageHandler handler : this.handlers) {
dispatchToHandler(message, handler);
}
return true;
}
private void dispatchToHandler(final Message<?> message, final MessageHandler handler) {
if (this.executor == null) {
handler.handleMessage(message);
}
else {
this.executor.execute(new Runnable() {
@Override
public void run() {
handler.handleMessage(message);
}
});
}
}
@Override
public boolean subscribe(MessageHandler handler) {
return this.handlers.add(handler);
}
@Override
public boolean unsubscribe(MessageHandler handler) {
return this.handlers.remove(handler);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.core.DestinationResolver;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public class BeanFactoryChannelResolver implements DestinationResolver<MessageChannel> {
private final BeanFactory beanFactory;
public BeanFactoryChannelResolver(BeanFactory beanFactory) {
Assert.notNull(beanFactory, "beanFactory must not be null");
this.beanFactory = beanFactory;
}
@Override
public MessageChannel resolveDestination(String name) {
return this.beanFactory.getBean(name, MessageChannel.class);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import org.springframework.messaging.MessagingException;
/**
* Thrown by a ChannelResolver when it cannot resolve a channel name.
*
* @author Mark Fisher
* @since 4.0
*/
@SuppressWarnings("serial")
public class ChannelResolutionException extends MessagingException {
/**
* Create a new ChannelResolutionException.
* @param description the description
*/
public ChannelResolutionException(String description) {
super(description);
}
/**
* Create a new ChannelResolutionException.
* @param description the description
* @param cause the root cause (if any)
*/
public ChannelResolutionException(String description, Throwable cause) {
super(description, cause);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.channel;
import org.springframework.messaging.MessageChannel;
/**
* Strategy for resolving a name to a {@link MessageChannel}.
*
* @author Mark Fisher
* @since 4.0
*/
public interface ChannelResolver {
/**
* Return the MessageChannel for the given name.
*/
MessageChannel resolveChannelName(String channelName);
}
......@@ -26,10 +26,7 @@ import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
/**
* A {@link SubscribableChannel} that sends messages to each of its subscribers. For a
* more feature complete implementation consider
* {@code org.springframework.integration.channel.PublishSubscribeChannel} from the
* Spring Integration project.
* A {@link SubscribableChannel} that sends messages to each of its subscribers.
*
* @author Phillip Webb
* @since 4.0
......
/**
* Provides classes representing various channel types.
*/
package org.springframework.messaging.channel;
\ No newline at end of file
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
/**
* @author Mark Fisher
* @since 4.0
*/
public class DefaultMessageConverter implements MessageConverter {
@Override
public <T> Message<?> toMessage(T object) {
System.out.println("converting " + object + " to message");
return MessageBuilder.withPayload(object).build();
}
@Override
@SuppressWarnings("unchecked")
public <T> T fromMessage(Message<?> message) {
System.out.println("converting " + message + " to object");
return (T) message.getPayload();
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.converter;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageConverter {
<T> Message<?> toMessage(T object);
<T> T fromMessage(Message<?> message);
}
/**
* Provides classes supporting message conversion.
*/
package org.springframework.messaging.converter;
\ No newline at end of file
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractMessagingTemplate<D>
implements ResolvableDestinationMessageReceivingOperations<D> {
private volatile DestinationResolver<D> destinationResolver;
public void setDestinationResolver(DestinationResolver<D> destinationResolver) {
this.destinationResolver = destinationResolver;
}
@Override
public <P> void send(String destinationName, Message<P> message) {
D destination = resolveDestination(destinationName);
this.doSend(destination, message);
}
protected final D resolveDestination(String destinationName) {
Assert.notNull(destinationResolver, "destinationResolver is required when passing a name only");
return this.destinationResolver.resolveDestination(destinationName);
}
@Override
public <T> void convertAndSend(String destinationName, T message) {
this.convertAndSend(destinationName, message, null);
}
@Override
public <T> void convertAndSend(String destinationName, T message, MessagePostProcessor postProcessor) {
D destination = resolveDestination(destinationName);
super.convertAndSend(destination, message, postProcessor);
}
@Override
public <P> Message<P> receive(String destinationName) {
D destination = resolveDestination(destinationName);
return super.receive(destination);
}
@Override
public Object receiveAndConvert(String destinationName) {
D destination = resolveDestination(destinationName);
return super.receiveAndConvert(destination);
}
@Override
public Message<?> sendAndReceive(String destinationName, Message<?> requestMessage) {
D destination = resolveDestination(destinationName);
return super.sendAndReceive(destination, requestMessage);
}
@Override
public Object convertSendAndReceive(String destinationName, Object request) {
D destination = resolveDestination(destinationName);
return super.convertSendAndReceive(destination, request);
}
@Override
public Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor postProcessor) {
D destination = resolveDestination(destinationName);
return super.convertSendAndReceive(destination, request, postProcessor);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.converter.DefaultMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractMessagingTemplate<D> implements MessageReceivingOperations<D> {
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile D defaultDestination;
protected volatile MessageConverter converter = new DefaultMessageConverter();
public void setDefaultDestination(D defaultDestination) {
this.defaultDestination = defaultDestination;
}
/**
* Set the {@link MessageConverter} that is to be used to convert
* between Messages and objects for this template.
* <p>The default is {@link DefaultMessageConverter}.
*/
public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
this.converter = messageConverter;
}
@Override
public <P> void send(Message<P> message) {
this.send(getRequiredDefaultDestination(), message);
}
private D getRequiredDefaultDestination() {
Assert.state(this.defaultDestination != null,
"No 'defaultDestination' specified for MessagingTemplate. "
+ "Unable to invoke method without an explicit destination argument.");
return this.defaultDestination;
}
@Override
public <P> void send(D destination, Message<P> message) {
this.doSend(destination, message);
}
protected abstract void doSend(D destination, Message<?> message) ;
@Override
public <T> void convertAndSend(T message) {
this.convertAndSend(getRequiredDefaultDestination(), message);
}
@Override
public <T> void convertAndSend(D destination, T object) {
this.convertAndSend(destination, object, null);
}
@Override
public <T> void convertAndSend(T object, MessagePostProcessor postProcessor) {
this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor);
}
@Override
public <T> void convertAndSend(D destination, T object, MessagePostProcessor postProcessor)
throws MessagingException {
Message<?> message = this.converter.toMessage(object);
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
this.send(destination, message);
}
@Override
public <P> Message<P> receive() {
return this.receive(getRequiredDefaultDestination());
}
@Override
public <P> Message<P> receive(D destination) {
return this.doReceive(destination);
}
protected abstract <P> Message<P> doReceive(D destination);
@Override
public Object receiveAndConvert() {
return this.receiveAndConvert(getRequiredDefaultDestination());
}
@Override
public Object receiveAndConvert(D destination) {
Message<Object> message = this.doReceive(destination);
return (message != null) ? this.converter.fromMessage(message) : null;
}
@Override
public Message<?> sendAndReceive(Message<?> requestMessage) {
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
return this.doSendAndReceive(destination, requestMessage);
}
protected abstract <S, R> Message<R> doSendAndReceive(D destination, Message<S> requestMessage);
@Override
public Object convertSendAndReceive(Object request) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request);
}
@Override
public Object convertSendAndReceive(D destination, Object request) {
return this.convertSendAndReceive(destination, request, null);
}
@Override
public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor);
}
@Override
public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) {
Message<?> requestMessage = this.converter.toMessage(request);
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return this.converter.fromMessage(replyMessage);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface DestinationResolver<D> {
D resolveDestination(String name);
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.channel.BeanFactoryChannelResolver;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
/**
* @author Mark Fisher
* @since 4.0
*/
public class IntegrationTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
implements BeanFactoryAware {
private volatile long sendTimeout = -1;
private volatile long receiveTimeout = -1;
private volatile boolean throwExceptionOnLateReply = false;
/**
* Specify the timeout value to use for send operations.
*
* @param sendTimeout the send timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}
/**
* Specify the timeout value to use for receive operations.
*
* @param receiveTimeout the receive timeout in milliseconds
*/
public void setReceiveTimeout(long receiveTimeout) {
this.receiveTimeout = receiveTimeout;
}
/**
* Specify whether or not an attempt to send on the reply channel throws an exception
* if no receiving thread will actually receive the reply. This can occur
* if the receiving thread has already timed out, or will never call receive()
* because it caught an exception, or has already received a reply.
* (default false - just a WARN log is emitted in these cases).
* @param throwExceptionOnLateReply TRUE or FALSE.
*/
public void setThrowExceptionOnLateReply(boolean throwExceptionOnLateReply) {
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
super.setDestinationResolver(new BeanFactoryChannelResolver(beanFactory));
}
@Override
protected final void doSend(MessageChannel destination, Message<?> message) {
Assert.notNull(destination, "channel must not be null");
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0)
? destination.send(message, timeout)
: destination.send(message);
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to channel '" + destination + "' within timeout: " + timeout);
}
}
@SuppressWarnings("unchecked")
@Override
protected final <P> Message<P> doReceive(MessageChannel destination) {
Assert.state(destination instanceof PollableChannel,
"The 'destination' must be a PollableChannel for receive operations.");
Assert.notNull(destination, "channel must not be null");
long timeout = this.receiveTimeout;
Message<?> message = (timeout >= 0)
? ((PollableChannel) destination).receive(timeout)
: ((PollableChannel) destination).receive();
if (message == null && this.logger.isTraceEnabled()) {
this.logger.trace("failed to receive message from channel '" + destination + "' within timeout: " + timeout);
}
return (Message<P>) message;
}
@Override
protected final <S, R> Message<R> doSendAndReceive(MessageChannel destination, Message<S> requestMessage) {
Object originalReplyChannelHeader = requestMessage.getHeaders().getReplyChannel();
Object originalErrorChannelHeader = requestMessage.getHeaders().getErrorChannel();
TemporaryReplyChannel replyChannel = new TemporaryReplyChannel(this.receiveTimeout, this.throwExceptionOnLateReply);
requestMessage = MessageBuilder.fromMessage(requestMessage)
.setReplyChannel(replyChannel)
.setErrorChannel(replyChannel)
.build();
try {
this.doSend(destination, requestMessage);
}
catch (RuntimeException e) {
replyChannel.setClientWontReceive(true);
throw e;
}
Message<R> reply = this.doReceive(replyChannel);
if (reply != null) {
reply = MessageBuilder.fromMessage(reply)
.setHeader(MessageHeaders.REPLY_CHANNEL, originalReplyChannelHeader)
.setHeader(MessageHeaders.ERROR_CHANNEL, originalErrorChannelHeader)
.build();
}
return reply;
}
private static class TemporaryReplyChannel implements PollableChannel {
private static final Log logger = LogFactory.getLog(TemporaryReplyChannel.class);
private volatile Message<?> message;
private final long receiveTimeout;
private final CountDownLatch latch = new CountDownLatch(1);
private final boolean throwExceptionOnLateReply;
private volatile boolean clientTimedOut;
private volatile boolean clientWontReceive;
private volatile boolean clientHasReceived;
public TemporaryReplyChannel(long receiveTimeout, boolean throwExceptionOnLateReply) {
this.receiveTimeout = receiveTimeout;
this.throwExceptionOnLateReply = throwExceptionOnLateReply;
}
public void setClientWontReceive(boolean clientWontReceive) {
this.clientWontReceive = clientWontReceive;
}
@Override
public Message<?> receive() {
return this.receive(-1);
}
@Override
public Message<?> receive(long timeout) {
try {
if (this.receiveTimeout < 0) {
this.latch.await();
this.clientHasReceived = true;
}
else {
if (this.latch.await(this.receiveTimeout, TimeUnit.MILLISECONDS)) {
this.clientHasReceived = true;
}
else {
this.clientTimedOut = true;
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return this.message;
}
@Override
public boolean send(Message<?> message) {
return this.send(message, -1);
}
@Override
public boolean send(Message<?> message, long timeout) {
this.message = message;
this.latch.countDown();
if (this.clientTimedOut || this.clientHasReceived || this.clientWontReceive) {
String exceptionMessage = "";
if (this.clientTimedOut) {
exceptionMessage = "Reply message being sent, but the receiving thread has already timed out";
}
else if (this.clientHasReceived) {
exceptionMessage = "Reply message being sent, but the receiving thread has already received a reply";
}
else if (this.clientWontReceive) {
exceptionMessage = "Reply message being sent, but the receiving thread has already caught an exception and won't receive";
}
if (logger.isWarnEnabled()) {
logger.warn(exceptionMessage + ":" + message);
}
if (this.throwExceptionOnLateReply) {
throw new MessageDeliveryException(message, exceptionMessage);
}
}
return true;
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* To be used with MessagingTemplate's send method that converts an object to a message.
* It allows for further modification of the message after it has been processed
* by the converter.
*
* <p>This is often implemented as an anonymous class within a method implementation.
*
* @author Mark Fisher
* @since 4.0
*/
public interface MessagePostProcessor {
/**
* Apply a MessagePostProcessor to the message. The returned message is
* typically a modified version of the original.
* @param message the message returned from the MessageConverter
* @return the modified version of the Message
*/
Message<?> postProcessMessage(Message<?> message);
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageReceivingOperations<D> extends MessageSendingOperations<D> {
<P> Message<P> receive() throws MessagingException;
<P> Message<P> receive(D destination) throws MessagingException;
Object receiveAndConvert() throws MessagingException;
Object receiveAndConvert(D destination) throws MessagingException;
Message<?> sendAndReceive(Message<?> requestMessage);
Message<?> sendAndReceive(D destination, Message<?> requestMessage);
Object convertSendAndReceive(Object request);
Object convertSendAndReceive(D destination, Object request);
Object convertSendAndReceive(Object request, MessagePostProcessor requestPostProcessor);
Object convertSendAndReceive(D destination, Object request, MessagePostProcessor requestPostProcessor);
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface MessageSendingOperations<D> {
<P> void send(Message<P> message) throws MessagingException;
<P> void send(D destination, Message<P> message) throws MessagingException;
<T> void convertAndSend(T message) throws MessagingException;
<T> void convertAndSend(D destination, T message) throws MessagingException;
<T> void convertAndSend(T message, MessagePostProcessor postProcessor) throws MessagingException;
<T> void convertAndSend(D destination, T message, MessagePostProcessor postProcessor) throws MessagingException;
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface ResolvableDestinationMessageReceivingOperations<D>
extends MessageReceivingOperations<D>, ResolvableDestinationMessageSendingOperations<D> {
<P> Message<P> receive(String destinationName) throws MessagingException;
Object receiveAndConvert(String destinationName) throws MessagingException;
Message<?> sendAndReceive(String destinationName, Message<?> requestMessage);
Object convertSendAndReceive(String destinationName, Object request);
Object convertSendAndReceive(String destinationName, Object request, MessagePostProcessor requestPostProcessor);
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
/**
* @author Mark Fisher
* @since 4.0
*/
public interface ResolvableDestinationMessageSendingOperations<D> extends MessageSendingOperations<D> {
<P> void send(String destinationName, Message<P> message) throws MessagingException;
<T> void convertAndSend(String destinationName, T message) throws MessagingException;
<T> void convertAndSend(String destinationName, T message, MessagePostProcessor postProcessor)
throws MessagingException;
}
/**
* Provides core messaging classes.
*/
package org.springframework.messaging.core;
\ No newline at end of file
package org.springframework.web.messaging.support;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.converter.DefaultMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.MessageType;
public class WebMessagingTemplate implements MessageSendingOperations<String> {
private final MessageChannel outputChannel;
protected volatile MessageConverter converter = new DefaultMessageConverter();
private volatile String defaultDestination;
private volatile long sendTimeout = -1;
public WebMessagingTemplate(MessageChannel outputChannel) {
Assert.notNull(outputChannel, "outputChannel is required");
this.outputChannel = outputChannel;
}
/**
* Set the {@link MessageConverter} that is to be used to convert
* between Messages and objects for this template.
* <p>The default is {@link SimpleMessageConverter}.
*/
public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
this.converter = messageConverter;
}
/**
* Specify the timeout value to use for send operations.
*
* @param sendTimeout the send timeout in milliseconds
*/
public void setSendTimeout(long sendTimeout) {
this.sendTimeout = sendTimeout;
}
public void setDefaultDestination(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
@Override
public <P> void send(Message<P> message) {
this.send(getRequiredDefaultDestination(), message);
}
private String getRequiredDefaultDestination() {
// TODO: maybe look up destination of current message (via ThreadLocal)
Assert.state(this.defaultDestination != null,
"No 'defaultDestination' specified for WebMessagingTemplate. " +
"Unable to invoke method without an explicit destination argument.");
return this.defaultDestination;
}
@Override
public <P> void send(String destinationName, Message<P> message) {
Assert.notNull(destinationName, "destinationName is required");
message = addDestinationToMessage(message, destinationName);
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0)
? this.outputChannel.send(message, timeout)
: this.outputChannel.send(message);
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to destination '" + destinationName + "' within timeout: " + timeout);
}
}
protected <P> Message<P> addDestinationToMessage(Message<P> message, String destinationName) {
Assert.notNull(destinationName, "destinationName is required");
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.MESSAGE);
headers.copyHeaders(message.getHeaders());
headers.setDestination(destinationName);
message = MessageBuilder.withPayload(message.getPayload()).copyHeaders(headers.toMap()).build();
return message;
}
@Override
public <T> void convertAndSend(T object) {
this.convertAndSend(getRequiredDefaultDestination(), object);
}
@Override
public <T> void convertAndSend(String destinationName, T object) {
this.convertAndSend(destinationName, object, null);
}
@Override
public <T> void convertAndSend(T object, MessagePostProcessor postProcessor) {
this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor);
}
@Override
public <T> void convertAndSend(String destinationName, T object, MessagePostProcessor postProcessor) {
Message<?> message = this.converter.toMessage(object);
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
this.send(destinationName, message);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册