提交 ef823721 编写于 作者: R Rossen Stoyanchev

Split AbstractMessagingTemplate across send/receive

上级 078c766b
......@@ -23,7 +23,7 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractMessagingTemplate<D>
public abstract class AbstractDestinationResolvingMessagingTemplate<D> extends AbstractReceivingMessagingTemplate<D>
implements ResolvableDestinationMessageReceivingOperations<D> {
private volatile DestinationResolver<D> destinationResolver;
......
......@@ -28,7 +28,7 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractMessagingTemplate<D> implements MessageReceivingOperations<D> {
public abstract class AbstractMessagingTemplate<D> implements MessageSendingOperations<D> {
protected final Log logger = LogFactory.getLog(this.getClass());
......@@ -57,7 +57,7 @@ public abstract class AbstractMessagingTemplate<D> implements MessageReceivingOp
this.send(getRequiredDefaultDestination(), message);
}
private D getRequiredDefaultDestination() {
protected final D getRequiredDefaultDestination() {
Assert.state(this.defaultDestination != null,
"No 'defaultDestination' specified for MessagingTemplate. "
+ "Unable to invoke method without an explicit destination argument.");
......@@ -98,68 +98,4 @@ public abstract class AbstractMessagingTemplate<D> implements MessageReceivingOp
this.send(destination, message);
}
@Override
public <P> Message<P> receive() {
return this.receive(getRequiredDefaultDestination());
}
@Override
public <P> Message<P> receive(D destination) {
return this.doReceive(destination);
}
protected abstract <P> Message<P> doReceive(D destination);
@Override
public Object receiveAndConvert() {
return this.receiveAndConvert(getRequiredDefaultDestination());
}
@Override
public Object receiveAndConvert(D destination) {
Message<Object> message = this.doReceive(destination);
return (message != null) ? this.converter.fromMessage(message) : null;
}
@Override
public Message<?> sendAndReceive(Message<?> requestMessage) {
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
return this.doSendAndReceive(destination, requestMessage);
}
protected abstract <S, R> Message<R> doSendAndReceive(D destination, Message<S> requestMessage);
@Override
public Object convertSendAndReceive(Object request) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request);
}
@Override
public Object convertSendAndReceive(D destination, Object request) {
return this.convertSendAndReceive(destination, request, null);
}
@Override
public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor);
}
@Override
public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) {
Message<?> requestMessage = this.converter.toMessage(request);
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return this.converter.fromMessage(replyMessage);
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.core;
import org.springframework.messaging.Message;
/**
* @author Mark Fisher
* @since 4.0
*/
public abstract class AbstractReceivingMessagingTemplate<D>
extends AbstractMessagingTemplate<D> implements MessageReceivingOperations<D> {
@Override
public <P> Message<P> receive() {
return this.receive(getRequiredDefaultDestination());
}
@Override
public <P> Message<P> receive(D destination) {
return this.doReceive(destination);
}
protected abstract <P> Message<P> doReceive(D destination);
@Override
public Object receiveAndConvert() {
return this.receiveAndConvert(getRequiredDefaultDestination());
}
@Override
public Object receiveAndConvert(D destination) {
Message<Object> message = this.doReceive(destination);
return (message != null) ? this.converter.fromMessage(message) : null;
}
@Override
public Message<?> sendAndReceive(Message<?> requestMessage) {
return this.sendAndReceive(getRequiredDefaultDestination(), requestMessage);
}
@Override
public Message<?> sendAndReceive(D destination, Message<?> requestMessage) {
return this.doSendAndReceive(destination, requestMessage);
}
protected abstract <S, R> Message<R> doSendAndReceive(D destination, Message<S> requestMessage);
@Override
public Object convertSendAndReceive(Object request) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request);
}
@Override
public Object convertSendAndReceive(D destination, Object request) {
return this.convertSendAndReceive(destination, request, null);
}
@Override
public Object convertSendAndReceive(Object request, MessagePostProcessor postProcessor) {
return this.convertSendAndReceive(getRequiredDefaultDestination(), request, postProcessor);
}
@Override
public Object convertSendAndReceive(D destination, Object request, MessagePostProcessor postProcessor) {
Message<?> requestMessage = this.converter.toMessage(request);
if (postProcessor != null) {
requestMessage = postProcessor.postProcessMessage(requestMessage);
}
Message<?> replyMessage = this.sendAndReceive(destination, requestMessage);
return this.converter.fromMessage(replyMessage);
}
}
......@@ -37,7 +37,7 @@ import org.springframework.util.Assert;
* @author Mark Fisher
* @since 4.0
*/
public class IntegrationTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
public class DefaultMessagingTemplate extends AbstractDestinationResolvingMessagingTemplate<MessageChannel>
implements BeanFactoryAware {
private volatile long sendTimeout = -1;
......
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging;
import java.util.concurrent.Executor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.springframework.messaging.support.MessageBuilder;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.BDDMockito.*;
/**
* Tests for {@link PublishSubscribeChannel}.
*
* @author Phillip Webb
*/
public class PublishSubscibeChannelTests {
@Rule
public ExpectedException thrown = ExpectedException.none();
private PublishSubscribeChannel channel = new PublishSubscribeChannel();
@Mock
private MessageHandler handler;
private final Object payload = new Object();
private final Message<Object> message = MessageBuilder.withPayload(this.payload).build();
@Captor
private ArgumentCaptor<Runnable> runnableCaptor;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
}
@Test
public void messageMustNotBeNull() throws Exception {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Message must not be null");
this.channel.send(null);
}
@Test
public void payloadMustNotBeNull() throws Exception {
Message<?> message = mock(Message.class);
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Message payload must not be null");
this.channel.send(message);
}
@Test
public void sendWithoutExecutor() {
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(this.handler).handleMessage(this.message);
}
@Test
public void sendWithExecutor() throws Exception {
Executor executor = mock(Executor.class);
this.channel = new PublishSubscribeChannel(executor);
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(executor).execute(this.runnableCaptor.capture());
verify(this.handler, never()).handleMessage(this.message);
this.runnableCaptor.getValue().run();
verify(this.handler).handleMessage(this.message);
}
@Test
public void subscribeTwice() throws Exception {
assertThat(this.channel.subscribe(this.handler), equalTo(true));
assertThat(this.channel.subscribe(this.handler), equalTo(false));
this.channel.send(this.message);
verify(this.handler, times(1)).handleMessage(this.message);
}
@Test
public void unsubscribeTwice() throws Exception {
this.channel.subscribe(this.handler);
assertThat(this.channel.unsubscribe(this.handler), equalTo(true));
assertThat(this.channel.unsubscribe(this.handler), equalTo(false));
this.channel.send(this.message);
verify(this.handler, never()).handleMessage(this.message);
}
@Test
public void failurePropagates() throws Exception {
RuntimeException ex = new RuntimeException();
willThrow(ex).given(this.handler).handleMessage(this.message);
MessageHandler secondHandler = mock(MessageHandler.class);
this.channel.subscribe(this.handler);
this.channel.subscribe(secondHandler);
try {
this.channel.send(message);
}
catch(RuntimeException actualException) {
assertThat(actualException, equalTo(ex));
}
verifyZeroInteractions(secondHandler);
}
@Test
public void concurrentModification() throws Exception {
this.channel.subscribe(new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
channel.unsubscribe(handler);
}
});
this.channel.subscribe(this.handler);
this.channel.send(this.message);
verify(this.handler).handleMessage(this.message);
}
}
......@@ -3,23 +3,16 @@ package org.springframework.web.messaging.support;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.converter.DefaultMessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.core.MessagePostProcessor;
import org.springframework.messaging.core.MessageSendingOperations;
import org.springframework.messaging.core.AbstractMessagingTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.web.messaging.MessageType;
public class WebMessagingTemplate implements MessageSendingOperations<String> {
public class WebMessagingTemplate extends AbstractMessagingTemplate<String> {
private final MessageChannel outputChannel;
protected volatile MessageConverter converter = new DefaultMessageConverter();
private volatile String defaultDestination;
private volatile long sendTimeout = -1;
......@@ -29,16 +22,6 @@ public class WebMessagingTemplate implements MessageSendingOperations<String> {
}
/**
* Set the {@link MessageConverter} that is to be used to convert
* between Messages and objects for this template.
* <p>The default is {@link SimpleMessageConverter}.
*/
public void setMessageConverter(MessageConverter messageConverter) {
Assert.notNull(messageConverter, "'messageConverter' must not be null");
this.converter = messageConverter;
}
/**
* Specify the timeout value to use for send operations.
*
......@@ -48,72 +31,34 @@ public class WebMessagingTemplate implements MessageSendingOperations<String> {
this.sendTimeout = sendTimeout;
}
public void setDefaultDestination(String defaultDestination) {
this.defaultDestination = defaultDestination;
}
@Override
public <P> void send(Message<P> message) {
this.send(getRequiredDefaultDestination(), message);
}
private String getRequiredDefaultDestination() {
// TODO: maybe look up destination of current message (via ThreadLocal)
Assert.state(this.defaultDestination != null,
"No 'defaultDestination' specified for WebMessagingTemplate. " +
"Unable to invoke method without an explicit destination argument.");
return this.defaultDestination;
this.send(getRequiredDefaultDestination(), message);
}
@Override
public <P> void send(String destinationName, Message<P> message) {
Assert.notNull(destinationName, "destinationName is required");
message = addDestinationToMessage(message, destinationName);
protected void doSend(String destination, Message<?> message) {
Assert.notNull(destination, "destination is required");
message = addDestinationToMessage(message, destination);
long timeout = this.sendTimeout;
boolean sent = (timeout >= 0)
? this.outputChannel.send(message, timeout)
: this.outputChannel.send(message);
if (!sent) {
throw new MessageDeliveryException(message,
"failed to send message to destination '" + destinationName + "' within timeout: " + timeout);
"failed to send message to destination '" + destination + "' within timeout: " + timeout);
}
}
protected <P> Message<P> addDestinationToMessage(Message<P> message, String destinationName) {
Assert.notNull(destinationName, "destinationName is required");
protected <P> Message<P> addDestinationToMessage(Message<P> message, String destination) {
Assert.notNull(destination, "destination is required");
WebMessageHeaderAccesssor headers = WebMessageHeaderAccesssor.create(MessageType.MESSAGE);
headers.copyHeaders(message.getHeaders());
headers.setDestination(destinationName);
headers.setDestination(destination);
message = MessageBuilder.withPayload(message.getPayload()).copyHeaders(headers.toMap()).build();
return message;
}
@Override
public <T> void convertAndSend(T object) {
this.convertAndSend(getRequiredDefaultDestination(), object);
}
@Override
public <T> void convertAndSend(String destinationName, T object) {
this.convertAndSend(destinationName, object, null);
}
@Override
public <T> void convertAndSend(T object, MessagePostProcessor postProcessor) {
this.convertAndSend(getRequiredDefaultDestination(), object, postProcessor);
}
@Override
public <T> void convertAndSend(String destinationName, T object, MessagePostProcessor postProcessor) {
Message<?> message = this.converter.toMessage(object);
if (postProcessor != null) {
message = postProcessor.postProcessMessage(message);
}
this.send(destinationName, message);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册