提交 0cc0a9bc 编写于 作者: J Juergen Hoeller

added "concurrency" property to Default/SimpleMessageListenerContainer and...

added "concurrency" property to Default/SimpleMessageListenerContainer and JmsActivationSpecConfig, supporting placeholders for the jms namespace "concurrency" attribute now (SPR-6232)
上级 998aa149
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -270,28 +270,4 @@ abstract class AbstractListenerContainerParser implements BeanDefinitionParser {
return (Boolean) configDef.getPropertyValues().getPropertyValue("pubSubDomain").getValue();
}
protected int[] parseConcurrency(Element ele, ParserContext parserContext) {
String concurrency = ele.getAttribute(CONCURRENCY_ATTRIBUTE);
if (!StringUtils.hasText(concurrency)) {
return null;
}
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
int[] result = new int[2];
result[0] = Integer.parseInt(concurrency.substring(0, separatorIndex));
result[1] = Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length()));
return result;
}
else {
return new int[] {1, Integer.parseInt(concurrency)};
}
}
catch (NumberFormatException ex) {
parserContext.getReaderContext().error("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.", ele, ex);
return null;
}
}
}
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -16,12 +16,13 @@
package org.springframework.jms.config;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
/**
* Parser for the JMS <code>&lt;jca-listener-container&gt;</code> element.
......@@ -41,9 +42,8 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
containerDef.setSource(parserContext.extractSource(containerEle));
containerDef.setBeanClassName("org.springframework.jms.listener.endpoint.JmsMessageEndpointManager");
String resourceAdapterBeanName = "resourceAdapter";
if (containerEle.hasAttribute(RESOURCE_ADAPTER_ATTRIBUTE)) {
resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE);
String resourceAdapterBeanName = containerEle.getAttribute(RESOURCE_ADAPTER_ATTRIBUTE);
if (!StringUtils.hasText(resourceAdapterBeanName)) {
parserContext.getReaderContext().error(
"Listener container 'resource-adapter' attribute contains empty value.", containerEle);
......@@ -88,14 +88,9 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
new RuntimeBeanReference(transactionManagerBeanName));
}
int[] concurrency = parseConcurrency(containerEle, parserContext);
if (concurrency != null) {
configDef.getPropertyValues().add("maxConcurrency", concurrency[1]);
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (StringUtils.hasText(concurrency)) {
configDef.getPropertyValues().add("concurrency", concurrency);
}
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
......@@ -103,6 +98,11 @@ class JcaListenerContainerParser extends AbstractListenerContainerParser {
configDef.getPropertyValues().add("prefetchSize", new Integer(prefetch));
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
containerDef.getPropertyValues().add("activationSpecConfig", configDef);
return containerDef;
......
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,12 +18,13 @@ package org.springframework.jms.config;
import javax.jms.Session;
import org.w3c.dom.Element;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.RuntimeBeanReference;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.beans.factory.xml.ParserContext;
import org.springframework.util.StringUtils;
import org.w3c.dom.Element;
/**
* Parser for the JMS <code>&lt;listener-container&gt;</code> element.
......@@ -107,11 +108,6 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
new RuntimeBeanReference(destinationResolverBeanName));
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
String cache = containerEle.getAttribute(CACHE_ATTRIBUTE);
if (StringUtils.hasText(cache)) {
if (containerType.startsWith("simple")) {
......@@ -148,15 +144,9 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
}
}
int[] concurrency = parseConcurrency(containerEle, parserContext);
if (concurrency != null) {
if (containerType.startsWith("default")) {
containerDef.getPropertyValues().add("concurrentConsumers", concurrency[0]);
containerDef.getPropertyValues().add("maxConcurrentConsumers", concurrency[1]);
}
else {
containerDef.getPropertyValues().add("concurrentConsumers", concurrency[1]);
}
String concurrency = containerEle.getAttribute(CONCURRENCY_ATTRIBUTE);
if (StringUtils.hasText(concurrency)) {
containerDef.getPropertyValues().add("concurrency", concurrency);
}
String prefetch = containerEle.getAttribute(PREFETCH_ATTRIBUTE);
......@@ -166,6 +156,11 @@ class JmsListenerContainerParser extends AbstractListenerContainerParser {
}
}
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
}
return containerDef;
}
......
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -257,6 +257,31 @@ public class DefaultMessageListenerContainer extends AbstractPollingMessageListe
}
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10" (the lower limit will be 1 in this case).
* <p>This listener container will always hold on to the minimum number of consumers
* ({@link #setConcurrentConsumers}) and will slowly scale up to the maximum number
* of consumers {@link #setMaxConcurrentConsumers} in case of increasing load.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(0, separatorIndex)));
setMaxConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setConcurrentConsumers(1);
setMaxConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported.");
}
}
/**
* Specify the number of concurrent consumers to create. Default is 1.
* <p>Specifying a higher value for this setting will increase the standard
......
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -89,6 +89,33 @@ public class SimpleMessageListenerContainer extends AbstractMessageListenerConta
return this.pubSubNoLocal;
}
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10".
* <p>This listener container will always hold on to the maximum number of
* consumers {@link #setConcurrentConsumers} since it is unable to scale.
* <p>This property is primarily supported for configuration compatibility with
* {@link DefaultMessageListenerContainer}. For this local listener container,
* generally use {@link #setConcurrentConsumers} instead.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setConcurrentConsumers(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setConcurrentConsumers(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
"Note that SimpleMessageListenerContainer will effectively ignore the minimum value and " +
"always keep a fixed number of consumers according to the maximum value.");
}
}
/**
* Specify the number of concurrent consumers to create. Default is 1.
* <p>Raising the number of concurrent consumers is recommendable in order
......
/*
* Copyright 2002-2007 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,6 +18,8 @@ package org.springframework.jms.listener.endpoint;
import javax.jms.Session;
import org.springframework.core.Constants;
/**
* Common configuration object for activating a JMS message endpoint.
* Gets converted into a provider-specific JCA 1.5 ActivationSpec
......@@ -34,6 +36,10 @@ import javax.jms.Session;
*/
public class JmsActivationSpecConfig {
/** Constants instance for javax.jms.Session */
private static final Constants sessionConstants = new Constants(Session.class);
private String destinationName;
private boolean pubSubDomain = false;
......@@ -101,26 +107,96 @@ public class JmsActivationSpecConfig {
return this.messageSelector;
}
/**
* Set the JMS acknowledgement mode by the name of the corresponding constant
* in the JMS {@link Session} interface, e.g. "CLIENT_ACKNOWLEDGE".
* <p>Note that JCA resource adapters generally only support auto and dups-ok
* (see Spring's {@link StandardJmsActivationSpecFactory}). ActiveMQ also
* supports "SESSION_TRANSACTED" in the form of RA-managed transactions
* (automatically translated by Spring's {@link DefaultJmsActivationSpecFactory}.
* @param constantName the name of the {@link Session} acknowledge mode constant
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Session#SESSION_TRANSACTED
* @see StandardJmsActivationSpecFactory
* @see DefaultJmsActivationSpecFactory
*/
public void setAcknowledgeModeName(String constantName) {
setAcknowledgeMode(sessionConstants.asNumber(constantName).intValue());
}
/**
* Set the JMS acknowledgement mode to use.
* @see javax.jms.Session#AUTO_ACKNOWLEDGE
* @see javax.jms.Session#CLIENT_ACKNOWLEDGE
* @see javax.jms.Session#DUPS_OK_ACKNOWLEDGE
* @see javax.jms.Session#SESSION_TRANSACTED
*/
public void setAcknowledgeMode(int acknowledgeMode) {
this.acknowledgeMode = acknowledgeMode;
}
/**
* Return the JMS acknowledgement mode to use.
*/
public int getAcknowledgeMode() {
return this.acknowledgeMode;
}
/**
* Specify concurrency limits via a "lower-upper" String, e.g. "5-10", or a simple
* upper limit String, e.g. "10".
* <p>JCA listener containers will always scale from zero to the given upper limit.
* A specified lower limit will effectively be ignored.
* <p>This property is primarily supported for configuration compatibility with
* {@link org.springframework.jms.listener.DefaultMessageListenerContainer}.
* For this activation config, generally use {@link #setMaxConcurrency} instead.
*/
public void setConcurrency(String concurrency) {
try {
int separatorIndex = concurrency.indexOf('-');
if (separatorIndex != -1) {
setMaxConcurrency(Integer.parseInt(concurrency.substring(separatorIndex + 1, concurrency.length())));
}
else {
setMaxConcurrency(Integer.parseInt(concurrency));
}
}
catch (NumberFormatException ex) {
throw new IllegalArgumentException("Invalid concurrency value [" + concurrency + "]: only " +
"single maximum integer (e.g. \"5\") and minimum-maximum combo (e.g. \"3-5\") supported. " +
"Note that JmsActivationSpecConfig will effectively ignore the minimum value and " +
"scale from zero up to the number of consumers according to the maximum value.");
}
}
/**
* Specify the maximum number of consumers/sessions to use, effectively
* controlling the number of concurrent invocations on the target listener.
*/
public void setMaxConcurrency(int maxConcurrency) {
this.maxConcurrency = maxConcurrency;
}
/**
* Return the maximum number of consumers/sessions to use.
*/
public int getMaxConcurrency() {
return this.maxConcurrency;
}
/**
* Specify the maximum number of messages to load into a session
* (a kind of batch size).
*/
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;
}
/**
* Return the maximum number of messages to load into a session.
*/
public int getPrefetchSize() {
return this.prefetchSize;
}
......
/*
* Copyright 2002-2009 the original author or authors.
* Copyright 2002-2010 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -86,9 +86,13 @@ public class JmsNamespaceHandlerTests extends TestCase {
for (DefaultMessageListenerContainer container : containers.values()) {
if (container.getConnectionFactory().equals(defaultConnectionFactory)) {
defaultConnectionFactoryCount++;
assertEquals(2, container.getConcurrentConsumers());
assertEquals(3, container.getMaxConcurrentConsumers());
}
else if (container.getConnectionFactory().equals(explicitConnectionFactory)) {
explicitConnectionFactoryCount++;
assertEquals(1, container.getConcurrentConsumers());
assertEquals(2, container.getMaxConcurrentConsumers());
}
}
......
......@@ -7,16 +7,24 @@
<jms:listener-container connection-factory="testConnectionFactory" task-executor="testTaskExecutor"
destination-resolver="testDestinationResolver" message-converter="testMessageConverter"
transaction-manager="testTransactionManager" error-handler="testErrorHandler" phase="99">
transaction-manager="testTransactionManager" error-handler="testErrorHandler" concurrency="1-2" phase="99">
<jms:listener id="listener1" destination="testDestination" ref="testBean1" method="setName"/>
<jms:listener id="listener2" destination="testDestination" ref="testBean2" method="setName" response-destination="responseDestination"/>
</jms:listener-container>
<!-- TODO: remove the task-executor reference once issue with blocking on stop is resolved -->
<jms:listener-container task-executor="testTaskExecutor">
<jms:listener-container task-executor="testTaskExecutor" concurrency="${concurrency}">
<jms:listener destination="testDestination" ref="testBean3"/>
</jms:listener-container>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="properties">
<props>
<prop key="concurrency">2-3</prop>
</props>
</property>
</bean>
<jms:jca-listener-container resource-adapter="testResourceAdapter" activation-spec-factory="testActivationSpecFactory"
message-converter="testMessageConverter" phase="77">
<jms:listener id="listener3" destination="testDestination" ref="testBean1" method="setName"/>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册