提交 5c9f09c2 编写于 作者: S Stephane Nicoll

Register lazy @JmsListener components

Support the creation and registration of message listener containers in
a lazy manner, that is after the container initialization has completed.

Such support brought an interesting brainstorming of the thread safety
if JmsListenerEndpointRegistrar and JmsListenerEndpointRegistry so those
have also been revisited as part of this commit.

Issue: SPR-12774
上级 fdd1f836
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -22,6 +22,7 @@ import java.util.List;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.util.Assert;
......@@ -50,6 +51,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
private final List<JmsListenerEndpointDescriptor> endpointDescriptors =
new ArrayList<JmsListenerEndpointDescriptor>();
private boolean startImmediately;
private Object mutex = endpointDescriptors;
/**
* Set the {@link JmsListenerEndpointRegistry} instance to use.
......@@ -113,6 +118,10 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableBeanFactory) {
ConfigurableBeanFactory cbf = (ConfigurableBeanFactory) beanFactory;
this.mutex = cbf.getSingletonMutex();
}
}
......@@ -122,8 +131,12 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
}
protected void registerAllEndpoints() {
for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));
synchronized (this.mutex) {
for (JmsListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
this.endpointRegistry.registerListenerContainer(
descriptor.endpoint, resolveContainerFactory(descriptor));
}
startImmediately = true; // trigger immediate startup
}
}
......@@ -157,7 +170,16 @@ public class JmsListenerEndpointRegistrar implements BeanFactoryAware, Initializ
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
this.endpointDescriptors.add(new JmsListenerEndpointDescriptor(endpoint, factory));
JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
synchronized (this.mutex) {
if (startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
/**
......
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,8 +18,8 @@ package org.springframework.jms.config;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
......@@ -57,7 +57,7 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
protected final Log logger = LogFactory.getLog(getClass());
private final Map<String, MessageListenerContainer> listenerContainers =
new LinkedHashMap<String, MessageListenerContainer>();
new ConcurrentHashMap<String, MessageListenerContainer>();
private int phase = Integer.MAX_VALUE;
......@@ -86,21 +86,43 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
* Create a message listener container for the given {@link JmsListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* <p>The {@code startImmediately} flag determines if the container should be
* started immediately.
* @param endpoint the endpoint to add
* @param factory the listener factory to use
* @param startImmediately start the container immediately if necessary
* @see #getListenerContainers()
* @see #getListenerContainer(String)
*/
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.notNull(id, "Endpoint id must not be null");
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (startImmediately) {
startIfNecessary(container);
}
}
}
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
/**
* Create a message listener container for the given {@link JmsListenerEndpoint}.
* <p>This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
* @param endpoint the endpoint to add
* @param factory the listener factory to use
* @see #registerListenerContainer(JmsListenerEndpoint, JmsListenerContainerFactory, boolean)
*/
public void registerListenerContainer(JmsListenerEndpoint endpoint, JmsListenerContainerFactory<?> factory) {
registerListenerContainer(endpoint, factory, false);
}
/**
......@@ -163,9 +185,7 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
startIfNecessary(listenerContainer);
}
}
......@@ -195,6 +215,17 @@ public class JmsListenerEndpointRegistry implements DisposableBean, SmartLifecyc
return false;
}
/**
* Start the specified {@link MessageListenerContainer} if it should be started
* on startup.
* @see MessageListenerContainer#isAutoStartup()
*/
private static void startIfNecessary(MessageListenerContainer listenerContainer) {
if (listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
private static class AggregatingCallback implements Runnable {
......
/*
* Copyright 2002-2014 the original author or authors.
* Copyright 2002-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -29,17 +29,22 @@ import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import org.springframework.jms.config.JmsListenerContainerTestFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.config.JmsListenerEndpointRegistry;
import org.springframework.jms.config.MessageListenerTestContainer;
import org.springframework.jms.config.SimpleJmsListenerEndpoint;
import org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
import org.springframework.stereotype.Component;
import static org.junit.Assert.*;
/**
* @author Stephane Nicoll
......@@ -115,6 +120,22 @@ public class EnableJmsTests extends AbstractJmsAnnotationDrivenTests {
EnableJmsSampleConfig.class, CustomBean.class);
}
@Test
public void lazyComponent() {
ConfigurableApplicationContext context = new AnnotationConfigApplicationContext(
EnableJmsDefaultContainerFactoryConfig.class, LazyBean.class);
JmsListenerContainerTestFactory defaultFactory =
context.getBean("jmsListenerContainerFactory", JmsListenerContainerTestFactory.class);
assertEquals(0, defaultFactory.getListenerContainers().size());
context.getBean(LazyBean.class); // trigger lazy resolution
assertEquals(1, defaultFactory.getListenerContainers().size());
MessageListenerTestContainer container = defaultFactory.getListenerContainers().get(0);
assertTrue("Should have been started " + container, container.isStarted());
context.close(); // Close and stop the listeners
assertTrue("Should have been stopped " + container, container.isStopped());
}
@EnableJms
@Configuration
static class EnableJmsSampleConfig {
......@@ -240,4 +261,13 @@ public class EnableJmsTests extends AbstractJmsAnnotationDrivenTests {
}
}
@Component
@Lazy
static class LazyBean {
@JmsListener(destination = "myQueue")
public void handle(String msg) {
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册