From 6ffeee3e6f31eab364d9a095b92c58305a959421 Mon Sep 17 00:00:00 2001 From: Juergen Hoeller Date: Fri, 12 Mar 2021 15:17:41 +0100 Subject: [PATCH] Basic integration tests with various listener container settings See gh-26442 --- spring-jms/spring-jms.gradle | 1 + ...sageListenerContainerIntegrationTests.java | 119 ++++++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerIntegrationTests.java diff --git a/spring-jms/spring-jms.gradle b/spring-jms/spring-jms.gradle index 784aead7da..a447c97a80 100644 --- a/spring-jms/spring-jms.gradle +++ b/spring-jms/spring-jms.gradle @@ -14,5 +14,6 @@ dependencies { optional("com.fasterxml.jackson.core:jackson-databind") testCompile(testFixtures(project(":spring-beans"))) testCompile(testFixtures(project(":spring-tx"))) + testCompile("org.apache.activemq:activemq-broker") testImplementation("javax.jms:javax.jms-api") } diff --git a/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerIntegrationTests.java b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerIntegrationTests.java new file mode 100644 index 0000000000..306c88394f --- /dev/null +++ b/spring-jms/src/test/java/org/springframework/jms/listener/MessageListenerContainerIntegrationTests.java @@ -0,0 +1,119 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.jms.listener; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.JMSException; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.jupiter.api.Test; + +import org.springframework.jms.core.JmsTemplate; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Juergen Hoeller + * @since 5.3.5 + */ +public class MessageListenerContainerIntegrationTests { + + @Test + public void simpleMessageListenerContainer() throws InterruptedException { + SimpleMessageListenerContainer mlc = new SimpleMessageListenerContainer(); + + testMessageListenerContainer(mlc); + } + + @Test + public void defaultMessageListenerContainer() throws InterruptedException { + DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer(); + + testMessageListenerContainer(mlc); + } + + @Test + public void defaultMessageListenerContainerWithMaxMessagesPerTask() throws InterruptedException { + DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer(); + mlc.setConcurrentConsumers(1); + mlc.setMaxConcurrentConsumers(2); + mlc.setMaxMessagesPerTask(1); + + testMessageListenerContainer(mlc); + } + + @Test + public void defaultMessageListenerContainerWithIdleReceivesPerTaskLimit() throws InterruptedException { + DefaultMessageListenerContainer mlc = new DefaultMessageListenerContainer(); + mlc.setConcurrentConsumers(1); + mlc.setMaxConcurrentConsumers(2); + mlc.setIdleReceivesPerTaskLimit(1); + + testMessageListenerContainer(mlc); + } + + private void testMessageListenerContainer(AbstractMessageListenerContainer mlc) throws InterruptedException { + ActiveMQConnectionFactory aqcf = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + TestMessageListener tml = new TestMessageListener(); + + mlc.setConnectionFactory(aqcf); + mlc.setMessageListener(tml); + mlc.setDestinationName("test"); + mlc.afterPropertiesSet(); + mlc.start(); + + JmsTemplate jt = new JmsTemplate(aqcf); + jt.setDefaultDestinationName("test"); + + Set messages = new HashSet<>(); + messages.add("text1"); + messages.add("text2"); + for (String message : messages) { + jt.convertAndSend(message); + } + assertThat(tml.result()).isEqualTo(messages); + + mlc.destroy(); + } + + + private static class TestMessageListener implements SessionAwareMessageListener { + + private final CountDownLatch latch = new CountDownLatch(2); + + private final Set messages = new CopyOnWriteArraySet<>(); + + @Override + public void onMessage(TextMessage message, Session session) throws JMSException { + this.messages.add(message.getText()); + this.latch.countDown(); + } + + public Set result() throws InterruptedException { + assertThat(this.latch.await(5, TimeUnit.SECONDS)).isTrue(); + return this.messages; + } + } + +} -- GitLab