提交 bded025d 编写于 作者: S Stephane Nicoll

@SendTo support for jms listener endpoints

This commit replaces the "responseDestination" attribute on the
JmsListener annotation by a support of the standard SendTo annotation.

Issue: SPR-11707
上级 4b0aba63
......@@ -61,7 +61,9 @@ import org.springframework.messaging.handler.annotation.MessageMapping;
*
* <p>Annotated method may have a non {@code void} return type. When they do, the result of the
* method invocation is sent as a JMS reply to the destination defined by either the
* {@code JMSReplyTO} header of the incoming message or the value of {@link #responseDestination()}.
* {@code JMSReplyTO} header of the incoming message. When this value is not set, a default
* destination can be provided by adding @{@link org.springframework.messaging.handler.annotation.SendTo
* SendTo} to the method declaration.
*
* @author Stephane Nicoll
* @since 4.1
......@@ -111,14 +113,4 @@ public @interface JmsListener {
*/
String selector() default "";
/**
* The name of the default response destination to send response messages to.
* <p>This will be applied in case of a request message that does not carry
* a "JMSReplyTo" field. The type of this destination will be determined
* by the listener-container's "destination-type" attribute.
* <p>Note: This only applies to a listener method with a return value,
* for which each result object will be converted into a response message.
*/
String responseDestination() default "";
}
......@@ -182,9 +182,6 @@ public class JmsListenerAnnotationBeanPostProcessor implements BeanPostProcessor
if (StringUtils.hasText(jmsListener.subscription())) {
endpoint.setSubscription(jmsListener.subscription());
}
if (StringUtils.hasText(jmsListener.responseDestination())) {
endpoint.setResponseDestination(jmsListener.responseDestination());
}
JmsListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = jmsListener.containerFactory();
......
......@@ -17,12 +17,16 @@
package org.springframework.jms.config;
import java.lang.reflect.Method;
import java.util.Arrays;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.jms.listener.MessageListenerContainer;
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
/**
* A {@link JmsListenerEndpoint} providing the method to invoke to process
......@@ -37,8 +41,6 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
private Method method;
private String responseDestination;
private JmsHandlerMethodFactory jmsHandlerMethodFactory;
/**
......@@ -64,20 +66,6 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
return method;
}
/**
* Set the name of the default response destination to send response messages to.
*/
public void setResponseDestination(String responseDestination) {
this.responseDestination = responseDestination;
}
/**
* Return the name of the default response destination to send response messages to.
*/
public String getResponseDestination() {
return responseDestination;
}
/**
* Set the {@link DefaultJmsHandlerMethodFactory} to use to build the
* {@link InvocableHandlerMethod} responsible to manage the invocation
......@@ -91,12 +79,12 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(jmsHandlerMethodFactory != null,
"Could not create message listener, message listener factory not set.");
MessagingMessageListenerAdapter messageListener = new MessagingMessageListenerAdapter();
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
InvocableHandlerMethod invocableHandlerMethod =
jmsHandlerMethodFactory.createInvocableHandlerMethod(getBean(), getMethod());
messageListener.setHandlerMethod(invocableHandlerMethod);
String responseDestination = getResponseDestination();
if (responseDestination != null) {
String responseDestination = getDefaultResponseDestination();
if (StringUtils.hasText(responseDestination)) {
if (isQueue()) {
messageListener.setDefaultResponseQueueName(responseDestination);
}
......@@ -111,6 +99,26 @@ public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint {
return messageListener;
}
/**
* Create an empty {@link MessagingMessageListenerAdapter} instance.
*/
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
return new MessagingMessageListenerAdapter();
}
private String getDefaultResponseDestination() {
SendTo ann = AnnotationUtils.getAnnotation(getMethod(), SendTo.class);
if (ann != null) {
Object[] destinations = ann.value();
if (destinations.length != 1) {
throw new IllegalStateException("Invalid @" + SendTo.class.getSimpleName() + " annotation on '"
+ getMethod() + "' one destination must be set (got " + Arrays.toString(destinations) + ")");
}
return (String) destinations[0];
}
return null;
}
@Override
protected StringBuilder getEndpointDescription() {
return super.getEndpointDescription()
......
......@@ -51,47 +51,82 @@ public class JmsMessageHeaderAccessor extends NativeMessageHeaderAccessor {
}
@Override
public Object getReplyChannel() {
return getReplyTo();
}
/**
* Return the {@link JmsHeaders#CORRELATION_ID correlationId}.
* @see JmsHeaders#CORRELATION_ID
*/
public String getCorrelationId() {
return (String) getHeader(JmsHeaders.CORRELATION_ID);
}
/**
* Return the {@link JmsHeaders#DESTINATION destination}.
* @see JmsHeaders#DESTINATION
*/
public Destination getDestination() {
return (Destination) getHeader(JmsHeaders.DESTINATION);
}
/**
* Return the {@link JmsHeaders#DELIVERY_MODE delivery mode}.
* @see JmsHeaders#DELIVERY_MODE
*/
public Integer getDeliveryMode() {
return (Integer) getHeader(JmsHeaders.DELIVERY_MODE);
}
/**
* Return the message {@link JmsHeaders#EXPIRATION expiration}.
* @see JmsHeaders#EXPIRATION
*/
public Long getExpiration() {
return (Long) getHeader(JmsHeaders.EXPIRATION);
}
/**
* Return the {@link JmsHeaders#MESSAGE_ID message id}.
* @see JmsHeaders#MESSAGE_ID
*/
public String getMessageId() {
return (String) getHeader(JmsHeaders.MESSAGE_ID);
}
/**
* Return the {@link JmsHeaders#PRIORITY}.
* @see JmsHeaders#PRIORITY
*/
public Integer getPriority() {
return (Integer) getHeader(JmsHeaders.PRIORITY);
}
/**
* Return the {@link JmsHeaders#REPLY_TO reply to}.
* @see JmsHeaders#REPLY_TO
*/
public Destination getReplyTo() {
return (Destination) getHeader(JmsHeaders.REPLY_TO);
}
/**
* Return the {@link JmsHeaders#REDELIVERED redelivered} flag.
* @see JmsHeaders#REDELIVERED
*/
public Boolean getRedelivered() {
return (Boolean) getHeader(JmsHeaders.REDELIVERED);
}
/**
* Return the {@link JmsHeaders#TYPE type}.
* @see JmsHeaders#TYPE
*/
public String getType() {
return (String) getHeader(JmsHeaders.TYPE);
}
/**
* Return the {@link JmsHeaders#TIMESTAMP timestamp}.
* @see JmsHeaders#TIMESTAMP
*/
public Long getTimestamp() {
return (Long) getHeader(JmsHeaders.TIMESTAMP);
}
......
......@@ -114,7 +114,7 @@ public abstract class AbstractJmsAnnotationDrivenTests {
static class FullBean {
@JmsListener(id = "listener1", containerFactory = "simpleFactory", destination = "queueIn",
responseDestination = "queueOut", selector = "mySelector", subscription = "mySubscription")
selector = "mySelector", subscription = "mySubscription")
public String fullHandle(String msg) {
return "reply";
}
......
......@@ -58,7 +58,6 @@ public class JmsListenerAnnotationBeanPostProcessorTests {
MethodJmsListenerEndpoint methodEndpoint = (MethodJmsListenerEndpoint) endpoint;
assertNotNull(methodEndpoint.getBean());
assertNotNull(methodEndpoint.getMethod());
assertNull(methodEndpoint.getResponseDestination());
assertTrue(methodEndpoint.isQueue());
assertTrue("Should have been started " + container, container.isStarted());
......
......@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.ObjectMessage;
import javax.jms.QueueSender;
......@@ -48,12 +49,14 @@ import org.springframework.jms.listener.adapter.ListenerExecutionFailedException
import org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter;
import org.springframework.jms.support.JmsMessageHeaderAccessor;
import org.springframework.jms.support.converter.JmsHeaders;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.handler.annotation.support.MethodArgumentTypeMismatchException;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.Errors;
......@@ -99,7 +102,6 @@ public class MethodJmsListenerEndpointTests {
endpoint.setBean(this);
endpoint.setMethod(getTestMethod());
endpoint.setJmsHandlerMethodFactory(factory);
endpoint.setResponseDestination("myResponseQueue");
assertNotNull(endpoint.createMessageListener(container));
}
......@@ -217,6 +219,56 @@ public class MethodJmsListenerEndpointTests {
verify(queueSender).close();
}
@Test
public void processAndReplyWithSendTo() throws JMSException {
MessagingMessageListenerAdapter listener = createDefaultInstance(String.class);
String body = "echo text";
String correlationId = "link-1234";
Destination replyDestination = new Destination() {};
DestinationResolver destinationResolver = mock(DestinationResolver.class);
TextMessage reply = mock(TextMessage.class);
QueueSender queueSender = mock(QueueSender.class);
Session session = mock(Session.class);
given(destinationResolver.resolveDestinationName(session, "replyDestination", false))
.willReturn(replyDestination);
given(session.createTextMessage(body)).willReturn(reply);
given(session.createProducer(replyDestination)).willReturn(queueSender);
listener.setDestinationResolver(destinationResolver);
StubTextMessage inputMessage = createSimpleJmsTextMessage(body);
inputMessage.setJMSCorrelationID(correlationId);
listener.onMessage(inputMessage, session);
assertDefaultListenerMethodInvocation();
verify(destinationResolver).resolveDestinationName(session, "replyDestination", false);
verify(reply).setJMSCorrelationID(correlationId);
verify(queueSender).send(reply);
verify(queueSender).close();
}
@Test
public void emptySendTo() throws JMSException {
MessagingMessageListenerAdapter listener = createDefaultInstance(String.class);
TextMessage reply = mock(TextMessage.class);
Session session = mock(Session.class);
given(session.createTextMessage("content")).willReturn(reply);
thrown.expect(ListenerExecutionFailedException.class);
thrown.expectCause(Matchers.isA(InvalidDestinationException.class));
listener.onMessage(createSimpleJmsTextMessage("content"), session);
}
@Test
public void invalidSendTo() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("firstDestination");
thrown.expectMessage("secondDestination");
createDefaultInstance(String.class);
}
@Test
public void validatePayloadValid() throws JMSException {
String methodName = "validatePayload";
......@@ -394,6 +446,24 @@ public class MethodJmsListenerEndpointTests {
return content;
}
@SendTo("replyDestination")
public String processAndReplyWithSendTo(String content) {
invocations.put("processAndReplyWithSendTo", true);
return content;
}
@SendTo("")
public String emptySendTo(String content) {
invocations.put("emptySendTo", true);
return content;
}
@SendTo({"firstDestination", "secondDestination"})
public String invalidSendTo(String content) {
invocations.put("invalidSendTo", true);
return content;
}
public void validatePayload(@Validated String payload) {
invocations.put("validatePayload", true);
}
......
......@@ -65,9 +65,12 @@ public class JmsMessageHeaderAccessorTests {
assertEquals("abcd-1234", headerAccessor.getMessageId());
assertEquals(Integer.valueOf(9), headerAccessor.getPriority());
assertEquals(replyTo, headerAccessor.getReplyTo());
assertEquals(replyTo, headerAccessor.getReplyChannel());
assertEquals(true, headerAccessor.getRedelivered());
assertEquals("type", headerAccessor.getType());
assertEquals(4567L, headerAccessor.getTimestamp(), 0.0);
// Making sure replyChannel is not mixed with replyTo
assertNull(headerAccessor.getReplyChannel());
}
}
/*
* Copyright 2002-2013 the original author or authors.
* Copyright 2002-2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -26,7 +26,11 @@ import org.springframework.messaging.Message;
/**
* Annotation that indicates a method's return value should be converted to
* a {@link Message} and sent to the specified destination.
* a {@link Message} if necessary and sent to the specified destination.
*
* <p>In a typical request/reply scenario, the incoming {@link Message} may
* convey the destination to use for the reply. In that case, that destination
* should take precedence.
*
* @author Rossen Stoyanchev
* @since 4.0
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册