diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index b33c892c7b7bdb552ccc5ec28577b56eb3238482..1e53a8782a4c6c07209b1076c1faf8f3b4899b0f 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -23,9 +23,9 @@ import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; +import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl; - import java.util.HashSet; import java.util.Set; @@ -57,65 +57,76 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { } @Override public Consumer createConsumer() { + String consumerId = accessPointProperties.getString("CONSUMER_ID"); + String[] nsStrArr = consumerId.split("_"); + if (nsStrArr.length < 2) { + return new PushConsumerImpl(accessPointProperties); + } + if ("pull".equals(nsStrArr[0])) { + return new PullConsumerImpl(accessPointProperties); + } return new PushConsumerImpl(accessPointProperties); } @Override public ResourceManager resourceManager() { - return new ResourceManager() { + DefaultResourceManager resourceManager = new DefaultResourceManager(); + return resourceManager; + } + + @Override public MessageFactory messageFactory() { + return null; + } - @Override - public void createNamespace(String nsName) { - accessPointProperties.put("CONSUMER_ID", nsName); - } + class DefaultResourceManager implements ResourceManager { - @Override - public void deleteNamespace(String nsName) { - accessPointProperties.put("CONSUMER_ID", null); - } + @Override + public void createNamespace(String nsName) { + accessPointProperties.put("CONSUMER_ID", nsName); + } - @Override - public void switchNamespace(String targetNamespace) { - accessPointProperties.put("CONSUMER_ID", targetNamespace); - } + @Override + public void deleteNamespace(String nsName) { + accessPointProperties.put("CONSUMER_ID", null); + } - @Override - public Set listNamespaces() { - return new HashSet() { - { - add(accessPointProperties.getString("CONSUMER_ID")); - } - }; - } + @Override + public void switchNamespace(String targetNamespace) { + accessPointProperties.put("CONSUMER_ID", targetNamespace); + } - @Override - public void createQueue(String queueName) { + @Override + public Set listNamespaces() { + return new HashSet() { + { + add(accessPointProperties.getString("CONSUMER_ID")); + } + }; + } - } + @Override + public void createQueue(String queueName) { - @Override - public void deleteQueue(String queueName) { + } - } + @Override + public void deleteQueue(String queueName) { - @Override - public Set listQueues(String nsName) { - return null; - } + } - @Override - public void filter(String queueName, String filterString) { + @Override + public Set listQueues(String nsName) { + return null; + } - } + @Override + public void filter(String queueName, String filterString) { - @Override - public void routing(String sourceQueue, String targetQueue) { + } - } - }; - } + @Override + public void routing(String sourceQueue, String targetQueue) { - @Override public MessageFactory messageFactory() { - return null; - } + } + }; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index 045f264e4343911e5556f42795f59f6594466c54..abb6eb80154851346f5811e7f63137ad891bf8bb 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -23,6 +23,19 @@ import io.openmessaging.extension.QueueMetaData; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.DefaultQueueMetaData; import io.openmessaging.rocketmq.domain.ConsumeRequest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; @@ -35,10 +48,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.utils.ThreadUtils; import org.apache.rocketmq.logging.InternalLogger; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.locks.ReadWriteLock; - class LocalMessageCache implements ServiceLifecycle { private final BlockingQueue consumeRequestCache; private final Map consumedRequest; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 7e6eaf4e9dc7d9d0f0545aafd745d0654daa774e..521dfc8b67e01295499bf4406457e2a819c68a84 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -33,7 +33,17 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.OMSUtil; -import org.apache.rocketmq.client.consumer.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; @@ -44,11 +54,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.LanguageCode; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.Set; - public class PullConsumerImpl implements Consumer { private final DefaultMQPullConsumer rocketmqPullConsumer; private final KeyValue properties; @@ -273,16 +278,16 @@ public class PullConsumerImpl implements Consumer { try { pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, 4 * 1024 * 1024, timeout); } catch (MQClientException e) { - e.printStackTrace(); + log.error("A error occurred when pull message.", e); return null; } catch (RemotingException e) { - e.printStackTrace(); + log.error("A error occurred when pull message.", e); return null; } catch (InterruptedException e) { - e.printStackTrace(); + log.error("A error occurred when pull message.", e); return null; } catch (MQBrokerException e) { - e.printStackTrace(); + log.error("A error occurred when pull message.", e); return null; } if (null == pullResult) { diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 1efa5b0475b78e424d8c43800ad33bd4a09ca6bb..6837e73a6347b5e4a95d4785f58a679e1f83fba9 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -34,6 +34,14 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.OMSUtil; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -45,11 +53,6 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.protocol.LanguageCode; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - public class PushConsumerImpl implements Consumer { private final static InternalLogger log = ClientLogger.getLog(); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java index 8dda492642d8ac10b1ba2315a3e7d5b84303f58f..a6e7585cb9bcc8568875bc1b999a8fd4f0324419 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java @@ -18,7 +18,7 @@ package io.openmessaging.rocketmq.domain; import io.openmessaging.message.Header; -public class MessageHeader implements Header{ +public class MessageHeader implements Header { private String destination; diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java index c8d7bb3617f6ca37c88a8bd684a66372fe1916c9..01c6fcdd09c65d829d2748d4ff09bce2a1ea6e6d 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -28,5 +28,5 @@ public interface NonStandardKeys { String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; String PRODUCER_ID = "PRODUCER_ID"; - String CONSUMER_ID ="CONSUMER_ID"; + String CONSUMER_ID = "CONSUMER_ID"; } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 7fcd86c97207dfa43f200923fe1e3f4f1cdec016..900e7d9e01eb63201f57b349296a4276c920bbce 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -16,12 +16,15 @@ */ package io.openmessaging.rocketmq.consumer; -import io.openmessaging.*; +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.OMS; import io.openmessaging.consumer.Consumer; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.Message; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.common.message.MessageExt; import org.junit.Before; @@ -30,11 +33,10 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.lang.reflect.Field; - -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; @RunWith(MockitoJUnitRunner.class) public class PullConsumerImplTest { @@ -49,11 +51,9 @@ public class PullConsumerImplTest { public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - final KeyValue attributes = messagingAccessPoint.attributes(); final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createNamespace("TestGroup"); + resourceManager.createNamespace("pull_TestGroup"); consumer = messagingAccessPoint.createConsumer(); -// consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); consumer.bindQueue(queueName); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); @@ -78,9 +78,7 @@ public class PullConsumerImplTest { consumedMsg.setBody(testBody); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.setTopic(queueName); - - when(localMessageCache.poll()).thenReturn(consumedMsg); - + doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class)); Message message = consumer.receive(3 * 1000); assertThat(message.header().getMessageId()).isEqualTo("NewMsgId"); assertThat(message.getData()).isEqualTo(testBody); @@ -88,11 +86,7 @@ public class PullConsumerImplTest { @Test public void testPoll_WithTimeout() { -// There is a default timeout value, @see ClientConfig#omsOperationTimeout. -// Message message = consumer.receive(3 * 1000); -// assertThat(message).isNull(); - -// message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100)); -// assertThat(message).isNull(); + Message message = consumer.receive(3 * 1000); + assertThat(message).isNull(); } } \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 4a2a84a728a474da5150088a38bb00fe0ab295cd..cd53ac6bb053aca0c1acf9c021e2f2af02203219 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -48,10 +48,8 @@ public class PushConsumerImplTest { public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - /* consumer = messagingAccessPoint.createPushConsumer( - OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup"));*/ final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createNamespace("TestGroup"); + resourceManager.createNamespace("push_TestGroup"); consumer = messagingAccessPoint.createConsumer(); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");