From 246e34eb01cdf46db20dccea9f04dc4fae279715 Mon Sep 17 00:00:00 2001 From: zhoubo <877036922@qq.com> Date: Thu, 20 Jun 2019 10:06:04 +0800 Subject: [PATCH] [ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Adapt to the new consumer api * New consumer api implements code optimization * Adjust consumer example * 1、Optimize consumer code implementation 2、Fix bug * 1、Optimize consumer code implementation 2、Fix bug * Add new api unit test * Rename OMSUtil to OMSClientUtil * Unit test adds more verification --- .../openmessaging/SimplePullConsumer.java | 39 ++-- .../openmessaging/SimplePushConsumer.java | 25 +-- .../rocketmq/MessagingAccessPointImpl.java | 93 +++------ .../rocketmq/config/DefaultQueueMetaData.java | 65 ------- .../rocketmq/consumer/LocalMessageCache.java | 62 +++--- .../rocketmq/consumer/PullConsumerImpl.java | 164 +++++----------- .../rocketmq/consumer/PushConsumerImpl.java | 179 ++++++------------ .../rocketmq/domain/BytesMessageImpl.java | 24 ++- .../domain/DefaultMessageFactory.java | 29 +++ .../domain/DefaultMessageReceipt.java | 66 +++++++ .../rocketmq/domain/DefaultQueueMetaData.java | 72 +++++++ .../rocketmq/domain/MessageExtension.java | 34 ++++ .../domain/MessageExtensionHeader.java | 146 ++++++++++++++ .../rocketmq/domain/MessageHeader.java | 13 ++ .../rocketmq/domain/NonStandardKeys.java | 4 +- .../producer/AbstractOMSProducer.java | 2 +- .../rocketmq/producer/ProducerImpl.java | 38 ++-- .../{OMSUtil.java => OMSClientUtil.java} | 28 ++- .../consumer/LocalMessageCacheTest.java | 90 +++++++++ .../consumer/PullConsumerImplTest.java | 155 +++++++++++++-- .../consumer/PushConsumerImplTest.java | 101 ++++++++-- 21 files changed, 949 insertions(+), 480 deletions(-) delete mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java rename openmessaging/src/main/java/io/openmessaging/rocketmq/utils/{OMSUtil.java => OMSClientUtil.java} (77%) diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java index 86aba410..7d82dc82 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -16,40 +16,44 @@ */ package org.apache.rocketmq.example.openmessaging; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; -import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.message.Message; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; +import io.openmessaging.rocketmq.domain.DefaultMessageReceipt; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.util.HashSet; +import java.util.Set; public class SimplePullConsumer { public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); - messagingAccessPoint.startup(); - final Producer producer = messagingAccessPoint.createProducer(); final PullConsumer consumer = messagingAccessPoint.createPullConsumer( - OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER")); - messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); - final String queueName = "TopicTest"; - - producer.startup(); - Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes()); + final String queueName = "OMS_HELLO_TOPIC"; + producer.start(); + Message msg = producer.createMessage(queueName, "Hello Open Messaging".getBytes()); SendResult sendResult = producer.send(msg); System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId()); - producer.shutdown(); + producer.stop(); - consumer.attachQueue(queueName); + Set queueNames = new HashSet(8) { + { + add(queueName); + } + }; + consumer.bindQueue(queueNames); - consumer.startup(); + consumer.start(); System.out.printf("Consumer startup OK%n"); // Keep running until we find the one that has just been sent @@ -57,9 +61,11 @@ public class SimplePullConsumer { while (!stop) { Message message = consumer.receive(); if (message != null) { - String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID); + String msgId = message.header().getMessageId(); System.out.printf("Received one message: %s%n", msgId); - consumer.ack(msgId); + DefaultMessageReceipt defaultMessageReceipt = new DefaultMessageReceipt(); + defaultMessageReceipt.setMessageId(msgId); + consumer.ack(defaultMessageReceipt); if (!stop) { stop = msgId.equalsIgnoreCase(sendResult.messageId()); @@ -70,7 +76,6 @@ public class SimplePullConsumer { } } - consumer.shutdown(); - messagingAccessPoint.shutdown(); + consumer.stop(); } } diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index 220c1323..7ac905af 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -16,12 +16,14 @@ */ package org.apache.rocketmq.example.openmessaging; -import io.openmessaging.Message; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; -import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.PushConsumer; +import io.openmessaging.message.Message; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.util.HashSet; +import java.util.Set; public class SimplePushConsumer { public static void main(String[] args) { @@ -29,28 +31,29 @@ public class SimplePushConsumer { .getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final PushConsumer consumer = messagingAccessPoint. - createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER")); - - messagingAccessPoint.startup(); - System.out.printf("MessagingAccessPoint startup OK%n"); + createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER")); Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { @Override public void run() { - consumer.shutdown(); - messagingAccessPoint.shutdown(); + consumer.stop(); } })); - consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { + Set queueNames = new HashSet(8) { + { + add("OMS_HELLO_TOPIC"); + } + }; + consumer.bindQueue(queueNames, new MessageListener() { @Override public void onReceived(Message message, Context context) { - System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)); + System.out.printf("Received one message: %s%n", message.header().getMessageId()); context.ack(); } }); - consumer.startup(); + consumer.start(); System.out.printf("Consumer startup OK%n"); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index f045a41e..715adb46 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -18,24 +18,26 @@ package io.openmessaging.rocketmq; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; -import io.openmessaging.consumer.Consumer; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.manager.ResourceManager; import io.openmessaging.message.MessageFactory; import io.openmessaging.producer.Producer; import io.openmessaging.producer.TransactionStateCheckListener; import io.openmessaging.rocketmq.consumer.PullConsumerImpl; import io.openmessaging.rocketmq.consumer.PushConsumerImpl; -import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.domain.DefaultMessageFactory; import io.openmessaging.rocketmq.producer.ProducerImpl; -import java.util.HashSet; -import java.util.Set; public class MessagingAccessPointImpl implements MessagingAccessPoint { private final KeyValue accessPointProperties; + private final MessageFactory messageFactory; + public MessagingAccessPointImpl(final KeyValue accessPointProperties) { this.accessPointProperties = accessPointProperties; + this.messageFactory = new DefaultMessageFactory(); } @Override @@ -57,77 +59,34 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { return null; } - @Override public Consumer createConsumer() { - String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID); - String[] nsStrArr = consumerId.split("_"); - if (nsStrArr.length < 2) { - return new PushConsumerImpl(accessPointProperties); - } - if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) { - return new PullConsumerImpl(accessPointProperties); - } + @Override public PushConsumer createPushConsumer() { return new PushConsumerImpl(accessPointProperties); } - @Override - public ResourceManager resourceManager() { - DefaultResourceManager resourceManager = new DefaultResourceManager(); - return resourceManager; - } - - @Override public MessageFactory messageFactory() { - return null; + @Override public PullConsumer createPullConsumer() { + return new PullConsumerImpl(accessPointProperties); } - class DefaultResourceManager implements ResourceManager { - - @Override - public void createNamespace(String nsName) { - accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName); - } - - @Override - public void deleteNamespace(String nsName) { - accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null); - } - - @Override - public void switchNamespace(String targetNamespace) { - accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace); + @Override public PushConsumer createPushConsumer(KeyValue attributes) { + for (String key : attributes.keySet()) { + accessPointProperties.put(key, attributes.getString(key)); } + return new PushConsumerImpl(accessPointProperties); + } - @Override - public Set listNamespaces() { - return new HashSet() { - { - add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID)); - } - }; - } - - @Override - public void createQueue(String queueName) { - - } - - @Override - public void deleteQueue(String queueName) { - - } - - @Override - public Set listQueues(String nsName) { - return null; - } - - @Override - public void filter(String queueName, String filterString) { - + @Override public PullConsumer createPullConsumer(KeyValue attributes) { + for (String key : attributes.keySet()) { + accessPointProperties.put(key, attributes.getString(key)); } + return new PullConsumerImpl(accessPointProperties); + } - @Override - public void routing(String sourceQueue, String targetQueue) { + @Override + public ResourceManager resourceManager() { + return null; + } - } - }; + @Override public MessageFactory messageFactory() { + return messageFactory; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java deleted file mode 100644 index b2695bfa..00000000 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/config/DefaultQueueMetaData.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.openmessaging.rocketmq.config; - -import io.openmessaging.extension.QueueMetaData; - -import java.util.List; - -public class DefaultQueueMetaData implements QueueMetaData { - - private String queueName; - - private List partitions; - - public DefaultQueueMetaData(String queueName, List partitions) { - this.queueName = queueName; - this.partitions = partitions; - } - - @Override - public String queueName() { - return queueName; - } - - @Override - public List partitions() { - return partitions; - } - - public static class DefaultPartition implements Partition { - - public DefaultPartition(int partitionId, String partitonHost) { - this.partitionId = partitionId; - this.partitonHost = partitonHost; - } - - private int partitionId; - - private String partitonHost; - - @Override - public int partitionId() { - return partitionId; - } - - @Override - public String partitonHost() { - return partitonHost; - } - } -} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java index c0f498b2..1838983e 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -21,9 +21,9 @@ import io.openmessaging.ServiceLifeState; import io.openmessaging.ServiceLifecycle; import io.openmessaging.extension.QueueMetaData; import io.openmessaging.rocketmq.config.ClientConfig; -import io.openmessaging.rocketmq.config.DefaultQueueMetaData; import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.NonStandardKeys; +import io.openmessaging.rocketmq.utils.OMSClientUtil; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -36,6 +36,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReadWriteLock; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; @@ -67,7 +68,7 @@ class LocalMessageCache implements ServiceLifecycle { this.rocketmqPullConsumer = rocketmqPullConsumer; this.clientConfig = clientConfig; this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( - "OMS_CleanExpireMsgScheduledThread_")); + "OMS_CleanExpireMsgScheduledThread_")); this.currentState = ServiceLifeState.INITIALIZED; } @@ -79,7 +80,7 @@ class LocalMessageCache implements ServiceLifecycle { if (!pullOffsetTable.containsKey(remoteQueue)) { try { pullOffsetTable.putIfAbsent(remoteQueue, - rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); + rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); } catch (MQClientException e) { log.error("A error occurred in fetch consume offset process.", e); } @@ -125,19 +126,28 @@ class LocalMessageCache implements ServiceLifecycle { return null; } - List batchPoll(final KeyValue properties) { - List consumeRequests = new ArrayList<>(16); - int n = consumeRequestCache.drainTo(consumeRequests); - if (n > 0) { - List messageExts = new ArrayList<>(n); - for (ConsumeRequest consumeRequest : consumeRequests) { - MessageExt messageExt = consumeRequest.getMessageExt(); - consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); - MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); - consumedRequest.put(messageExt.getMsgId(), consumeRequest); - messageExts.add(messageExt); + List batchPoll(KeyValue properties) { + List consumeRequests = new ArrayList<>(clientConfig.getRmqPullMessageBatchNums()); + long timeout = properties.getLong(NonStandardKeys.TIMEOUT); + while (timeout >= 0) { + int n = consumeRequestCache.drainTo(consumeRequests, clientConfig.getRmqPullMessageBatchNums()); + if (n > 0) { + List messageExts = new ArrayList<>(n); + for (ConsumeRequest consumeRequest : consumeRequests) { + MessageExt messageExt = consumeRequest.getMessageExt(); + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis())); + consumedRequest.put(messageExt.getMsgId(), consumeRequest); + messageExts.add(messageExt); + } + return messageExts; + } + if (timeout > 0) { + LockSupport.parkNanos(timeout * 1000 * 1000); + timeout = 0; + } else { + timeout = -1; } - return messageExts; } return null; } @@ -166,7 +176,7 @@ class LocalMessageCache implements ServiceLifecycle { private void cleanExpireMsg() { for (final Map.Entry next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() - .getRebalanceImpl().getProcessQueueTable().entrySet()) { + .getRebalanceImpl().getProcessQueueTable().entrySet()) { ProcessQueue pq = next.getValue(); MessageQueue mq = next.getKey(); ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); @@ -186,7 +196,7 @@ class LocalMessageCache implements ServiceLifecycle { if (!msgTreeMap.isEmpty()) { msg = msgTreeMap.firstEntry().getValue(); if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) - > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { + > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { //Expired, ack and remove it. } else { break; @@ -204,7 +214,7 @@ class LocalMessageCache implements ServiceLifecycle { try { rocketmqPullConsumer.sendMessageBack(msg, 3); log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", - msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); + msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); ack(mq, pq, msg); } catch (Exception e) { log.error("Send back expired msg exception", e); @@ -237,7 +247,7 @@ class LocalMessageCache implements ServiceLifecycle { public void stop() { this.currentState = ServiceLifeState.STOPPING; ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS); - this.currentState = ServiceLifeState.STARTED; + this.currentState = ServiceLifeState.STOPPED; } @Override @@ -246,7 +256,7 @@ class LocalMessageCache implements ServiceLifecycle { } @Override - public QueueMetaData getQueueMetaData(String queueName) { + public Set getQueueMetaData(String queueName) { Set messageQueues; try { messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName); @@ -254,16 +264,6 @@ class LocalMessageCache implements ServiceLifecycle { log.error("A error occurred when get queue metadata.", e); return null; } - List partitions = new ArrayList<>(16); - if (null != messageQueues && !messageQueues.isEmpty()) { - for (MessageQueue messageQueue : messageQueues) { - QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName()); - partitions.add(partition); - } - } else { - return null; - } - QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions); - return queueMetaData; + return OMSClientUtil.queueMetaDataConvert(messageQueues); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 03ff901a..f4efa92a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -18,10 +18,8 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.ServiceLifeState; -import io.openmessaging.consumer.BatchMessageListener; -import io.openmessaging.consumer.Consumer; -import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageReceipt; +import io.openmessaging.consumer.PullConsumer; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.extension.Extension; import io.openmessaging.extension.QueueMetaData; @@ -31,10 +29,13 @@ import io.openmessaging.message.Message; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.DefaultMessageReceipt; +import io.openmessaging.rocketmq.domain.MessageExtension; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; -import io.openmessaging.rocketmq.utils.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSClientUtil; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.Set; @@ -55,10 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.protocol.LanguageCode; -public class PullConsumerImpl implements Consumer { - - private static final int PULL_MAX_NUMS = 32; - private static final int PULL_MIN_NUMS = 1; +public class PullConsumerImpl implements PullConsumer { private final DefaultMQPullConsumer rocketmqPullConsumer; private final KeyValue properties; @@ -67,7 +65,8 @@ public class PullConsumerImpl implements Consumer { private final LocalMessageCache localMessageCache; private final ClientConfig clientConfig; private ServiceLifeState currentState; - private List consumerInterceptors; + private final List consumerInterceptors; + private final Extension extension; private final static InternalLogger log = ClientLogger.getLog(); @@ -96,15 +95,15 @@ public class PullConsumerImpl implements Consumer { int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes(); this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); - String consumerId = OMSUtil.buildInstanceName(); + String consumerId = OMSClientUtil.buildInstanceName(); this.rocketmqPullConsumer.setInstanceName(consumerId); properties.put(NonStandardKeys.CONSUMER_ID, consumerId); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); - consumerInterceptors = new ArrayList<>(16); + this.extension = new MessageExtension(this); } private void registerPullTaskCallback(final String targetQueueName) { @@ -139,96 +138,36 @@ public class PullConsumerImpl implements Consumer { }); } - @Override - public void resume() { - currentState = ServiceLifeState.STARTED; - } - - @Override - public void suspend() { - currentState = ServiceLifeState.STOPPED; + @Override public Set getBindQueues() { + return rocketmqPullConsumer.getRegisterTopics(); } @Override - public void suspend(long timeout) { - throw new UnsupportedOperationException(); - } - - @Override - public boolean isSuspended() { - if (ServiceLifeState.STOPPED.equals(currentState)) { - return true; - } - return false; + public void addInterceptor(ConsumerInterceptor interceptor) { + consumerInterceptors.add(interceptor); } @Override - public void bindQueue(String queueName) { - registerPullTaskCallback(queueName); + public void removeInterceptor(ConsumerInterceptor interceptor) { + consumerInterceptors.remove(interceptor); } - @Override - public void bindQueue(List queueNames) { + @Override public void bindQueue(Collection queueNames) { for (String queueName : queueNames) { - bindQueue(queueName); + registerPullTaskCallback(queueName); } } - @Override - public void bindQueue(String queueName, MessageListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public void bindQueues(List queueNames, MessageListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public void bindQueue(String queueName, BatchMessageListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public void bindQueues(List queueNames, BatchMessageListener listener) { - throw new UnsupportedOperationException(); - } - - @Override - public void unbindQueue(String queueName) { - this.rocketmqPullConsumer.getRegisterTopics().remove(queueName); - } - - @Override - public void unbindQueues(List queueNames) { + @Override public void unbindQueue(Collection queueNames) { for (String queueName : queueNames) { this.rocketmqPullConsumer.getRegisterTopics().remove(queueName); } } - @Override - public boolean isBindQueue() { - Set registerTopics = rocketmqPullConsumer.getRegisterTopics(); - if (null == registerTopics || registerTopics.isEmpty()) { - return false; - } - return true; - } - - @Override - public List getBindQueues() { - Set registerTopics = rocketmqPullConsumer.getRegisterTopics(); - return new ArrayList<>(registerTopics); - } - - @Override - public void addInterceptor(ConsumerInterceptor interceptor) { - consumerInterceptors.add(interceptor); - } - - @Override - public void removeInterceptor(ConsumerInterceptor interceptor) { - consumerInterceptors.remove(interceptor); + @Override public Message receive() { + KeyValue properties = new DefaultKeyValue(); + MessageExt rmqMsg = localMessageCache.poll(properties); + return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg); } @Override @@ -236,23 +175,24 @@ public class PullConsumerImpl implements Consumer { KeyValue properties = new DefaultKeyValue(); properties.put(NonStandardKeys.TIMEOUT, timeout); MessageExt rmqMsg = localMessageCache.poll(properties); - return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); + return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg); } @Override - public Message receive(String queueName, int partitionId, long receiptId, long timeout) { - MessageQueue mq = null; - mq = getQueue(queueName, partitionId, mq); - PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS); - if (pullResult == null) + public Message receive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, long timeout) { + MessageQueue mq; + mq = getQueue(queueMetaData); + PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, NonStandardKeys.PULL_MIN_NUMS); + if (pullResult == null) { return null; + } PullStatus pullStatus = pullResult.getPullStatus(); List messages = new ArrayList<>(16); if (PullStatus.FOUND.equals(pullStatus)) { List rmqMsgs = pullResult.getMsgFoundList(); if (null != rmqMsgs && !rmqMsgs.isEmpty()) { for (MessageExt messageExt : rmqMsgs) { - BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt); + BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt); messages.add(bytesMessage); } return messages.get(0); @@ -261,10 +201,10 @@ public class PullConsumerImpl implements Consumer { return null; } - private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) { + private PullResult getResult(long offset, long timeout, MessageQueue mq, int maxNums) { PullResult pullResult; try { - pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout); + pullResult = rocketmqPullConsumer.pull(mq, "*", offset, maxNums, timeout); } catch (MQClientException e) { log.error("A error occurred when pull message.", e); return null; @@ -278,17 +218,15 @@ public class PullConsumerImpl implements Consumer { log.error("A error occurred when pull message.", e); return null; } - if (null == pullResult) { - return null; - } return pullResult; } - private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) { + private MessageQueue getQueue(QueueMetaData queueMetaData) { + MessageQueue mq = null; try { - Set messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName); + Set messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueMetaData.queueName()); for (MessageQueue messageQueue : messageQueues) { - if (messageQueue.getQueueId() == partitionId) { + if (messageQueue.getQueueId() == queueMetaData.partitionId()) { mq = messageQueue; } } @@ -306,7 +244,7 @@ public class PullConsumerImpl implements Consumer { if (null != rmqMsgs && !rmqMsgs.isEmpty()) { List messages = new ArrayList<>(rmqMsgs.size()); for (MessageExt messageExt : rmqMsgs) { - BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt); + BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt); messages.add(bytesMessage); } return messages; @@ -315,19 +253,21 @@ public class PullConsumerImpl implements Consumer { } @Override - public List batchReceive(String queueName, int partitionId, long receiptId, long timeout) { - MessageQueue mq = null; - mq = getQueue(queueName, partitionId, mq); - PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS); - if (pullResult == null) + public List batchReceive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, + long timeout) { + MessageQueue mq; + mq = getQueue(queueMetaData); + PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, clientConfig.getRmqPullMessageBatchNums()); + if (pullResult == null) { return null; + } PullStatus pullStatus = pullResult.getPullStatus(); List messages = new ArrayList<>(16); if (PullStatus.FOUND.equals(pullStatus)) { List rmqMsgs = pullResult.getMsgFoundList(); if (null != rmqMsgs && !rmqMsgs.isEmpty()) { for (MessageExt messageExt : rmqMsgs) { - BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt); + BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt); messages.add(bytesMessage); } return messages; @@ -338,18 +278,12 @@ public class PullConsumerImpl implements Consumer { @Override public void ack(MessageReceipt receipt) { - + localMessageCache.ack(((DefaultMessageReceipt) receipt).getMessageId()); } @Override public Optional getExtension() { - - return Optional.of(new Extension() { - @Override - public QueueMetaData getQueueMetaData(String queueName) { - return getQueueMetaData(queueName); - } - }); + return Optional.of(extension); } @Override @@ -381,7 +315,7 @@ public class PullConsumerImpl implements Consumer { } @Override - public QueueMetaData getQueueMetaData(String queueName) { + public Set getQueueMetaData(String queueName) { return localMessageCache.getQueueMetaData(queueName); } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index 8d55b577..4576b282 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -20,27 +20,30 @@ import io.openmessaging.KeyValue; import io.openmessaging.OMS; import io.openmessaging.ServiceLifeState; import io.openmessaging.consumer.BatchMessageListener; -import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageReceipt; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.extension.Extension; import io.openmessaging.extension.QueueMetaData; import io.openmessaging.interceptor.ConsumerInterceptor; import io.openmessaging.message.Message; import io.openmessaging.rocketmq.config.ClientConfig; -import io.openmessaging.rocketmq.config.DefaultQueueMetaData; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.MessageExtension; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.utils.BeanUtils; -import io.openmessaging.rocketmq.utils.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSClientUtil; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; @@ -48,12 +51,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.ThreadFactoryImpl; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.protocol.LanguageCode; -public class PushConsumerImpl implements Consumer { +public class PushConsumerImpl implements PushConsumer { private final static InternalLogger log = ClientLogger.getLog(); @@ -65,6 +69,8 @@ public class PushConsumerImpl implements Consumer { private final ClientConfig clientConfig; private ServiceLifeState currentState; private List consumerInterceptors; + private ScheduledExecutorService scheduledExecutorService; + private final Extension extension; public PushConsumerImpl(final KeyValue properties) { this.rocketmqPushConsumer = new DefaultMQPushConsumer(); @@ -89,7 +95,7 @@ public class PushConsumerImpl implements Consumer { this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums()); this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums()); - String consumerId = OMSUtil.buildInstanceName(); + String consumerId = OMSClientUtil.buildInstanceName(); this.rocketmqPushConsumer.setInstanceName(consumerId); properties.put(NonStandardKeys.CONSUMER_ID, consumerId); this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS); @@ -97,6 +103,9 @@ public class PushConsumerImpl implements Consumer { this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); consumerInterceptors = new ArrayList<>(16); + scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl( + "OMS_SuspendTimeouThread_")); + extension = new MessageExtension(this); currentState = ServiceLifeState.INITIALIZED; } @@ -112,7 +121,12 @@ public class PushConsumerImpl implements Consumer { @Override public void suspend(long timeout) { - throw new UnsupportedOperationException(); + this.rocketmqPushConsumer.suspend(); + scheduledExecutorService.schedule(new Runnable() { + @Override public void run() { + PushConsumerImpl.this.rocketmqPushConsumer.resume(); + } + }, timeout, TimeUnit.MILLISECONDS); } @Override @@ -120,90 +134,49 @@ public class PushConsumerImpl implements Consumer { return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause(); } - @Override - public void bindQueue(String queueName) { - try { - rocketmqPushConsumer.subscribe(queueName, "*"); - } catch (MQClientException e) { - throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); - } - } - - @Override - public void bindQueue(List queueNames) { + @Override public void bindQueue(Collection queueNames, MessageListener listener) { for (String queueName : queueNames) { - bindQueue(queueName); - } - } - - @Override - public void bindQueue(String queueName, MessageListener listener) { - this.subscribeTable.put(queueName, listener); - this.batchSubscribeTable.remove(queueName); - try { - this.rocketmqPushConsumer.subscribe(queueName, "*"); - } catch (MQClientException e) { - throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); - } - } - - @Override - public void bindQueues(List queueNames, MessageListener listener) { - for (String queueName : queueNames) { - bindQueue(queueName, listener); - } - } - - @Override - public void bindQueue(String queueName, BatchMessageListener listener) { - this.batchSubscribeTable.put(queueName, listener); - this.subscribeTable.remove(queueName); - try { - this.rocketmqPushConsumer.subscribe(queueName, "*"); - } catch (MQClientException e) { - throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); + this.subscribeTable.put(queueName, listener); + this.batchSubscribeTable.remove(queueName); + this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(NonStandardKeys.PULL_MIN_NUMS); + try { + this.rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } } } - @Override - public void bindQueues(List queueNames, BatchMessageListener listener) { + @Override public void bindQueue(Collection queueNames, BatchMessageListener listener) { for (String queueName : queueNames) { - bindQueue(queueName, listener); - } - } - - @Override - public void unbindQueue(String queueName) { - this.subscribeTable.remove(queueName); - this.batchSubscribeTable.remove(queueName); - try { - this.rocketmqPushConsumer.unsubscribe(queueName); - } catch (Exception e) { - throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName)); + this.batchSubscribeTable.put(queueName, listener); + this.subscribeTable.remove(queueName); + this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(clientConfig.getRmqPullMessageBatchNums()); + try { + this.rocketmqPushConsumer.subscribe(queueName, "*"); + } catch (MQClientException e) { + throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName)); + } } } - @Override - public void unbindQueues(List queueNames) { + @Override public void unbindQueue(Collection queueNames) { for (String queueName : queueNames) { - unbindQueue(queueName); - } - } - - @Override - public boolean isBindQueue() { - Map subscription = rocketmqPushConsumer.getSubscription(); - if (null != subscription && subscription.size() > 0) { - return true; + this.subscribeTable.remove(queueName); + this.batchSubscribeTable.remove(queueName); + try { + this.rocketmqPushConsumer.unsubscribe(queueName); + } catch (Exception e) { + throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName)); + } } - return false; } @Override - public List getBindQueues() { + public Set getBindQueues() { Map subscription = rocketmqPushConsumer.getSubscription(); if (null != subscription && subscription.size() > 0) { - return new ArrayList<>(subscription.keySet()); + return subscription.keySet(); } return null; } @@ -218,26 +191,6 @@ public class PushConsumerImpl implements Consumer { consumerInterceptors.remove(interceptor); } - @Override - public Message receive(long timeout) { - throw new UnsupportedOperationException(); - } - - @Override - public Message receive(String queueName, int partitionId, long receiptId, long timeout) { - throw new UnsupportedOperationException(); - } - - @Override - public List batchReceive(long timeout) { - throw new UnsupportedOperationException(); - } - - @Override - public List batchReceive(String queueName, int partitionId, long receiptId, long timeout) { - throw new UnsupportedOperationException(); - } - @Override public void ack(MessageReceipt receipt) { throw new UnsupportedOperationException(); @@ -245,13 +198,7 @@ public class PushConsumerImpl implements Consumer { @Override public Optional getExtension() { - return Optional.of(new Extension() { - - @Override - public QueueMetaData getQueueMetaData(String queueName) { - return getQueueMetaData(queueName); - } - }); + return Optional.of(extension); } @Override @@ -284,7 +231,7 @@ public class PushConsumerImpl implements Consumer { } @Override - public QueueMetaData getQueueMetaData(String queueName) { + public Set getQueueMetaData(String queueName) { Set messageQueues; try { messageQueues = rocketmqPushConsumer.fetchSubscribeMessageQueues(queueName); @@ -292,24 +239,14 @@ public class PushConsumerImpl implements Consumer { log.error("A error occurred when get queue metadata.", e); return null; } - List partitions = new ArrayList<>(16); - if (null != messageQueues && !messageQueues.isEmpty()) { - for (MessageQueue messageQueue : messageQueues) { - QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName()); - partitions.add(partition); - } - } else { - return null; - } - QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions); - return queueMetaData; + return OMSClientUtil.queueMetaDataConvert(messageQueues); } class MessageListenerImpl implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List rmqMsgList, - ConsumeConcurrentlyContext contextRMQ) { + ConsumeConcurrentlyContext contextRMQ) { boolean batchFlag = true; MessageExt rmqMsg = rmqMsgList.get(0); BatchMessageListener batchMessageListener = PushConsumerImpl.this.batchSubscribeTable.get(rmqMsg.getTopic()); @@ -319,14 +256,14 @@ public class PushConsumerImpl implements Consumer { } if (listener == null && batchMessageListener == null) { throw new OMSRuntimeException(-1, - String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); + String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); } final KeyValue contextProperties = OMS.newKeyValue(); if (batchFlag) { - List messages = new ArrayList<>(16); + List messages = new ArrayList<>(32); for (MessageExt messageExt : rmqMsgList) { - BytesMessageImpl omsMsg = OMSUtil.msgConvert(messageExt); + BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(messageExt); messages.add(omsMsg); } final CountDownLatch sync = new CountDownLatch(1); @@ -344,7 +281,7 @@ public class PushConsumerImpl implements Consumer { public void ack() { sync.countDown(); contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); } }; long begin = System.currentTimeMillis(); @@ -356,7 +293,7 @@ public class PushConsumerImpl implements Consumer { } catch (InterruptedException ignore) { } } else { - BytesMessageImpl omsMsg = OMSUtil.msgConvert(rmqMsg); + BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(rmqMsg); final CountDownLatch sync = new CountDownLatch(1); @@ -368,7 +305,7 @@ public class PushConsumerImpl implements Consumer { public void ack() { sync.countDown(); contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, - ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); + ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); } }; long begin = System.currentTimeMillis(); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java index f1405b25..b5da5ce5 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/BytesMessageImpl.java @@ -17,22 +17,26 @@ package io.openmessaging.rocketmq.domain; import io.openmessaging.KeyValue; +import io.openmessaging.OMS; import io.openmessaging.consumer.MessageReceipt; import io.openmessaging.extension.ExtensionHeader; import io.openmessaging.message.Header; import io.openmessaging.message.Message; -import io.openmessaging.OMS; -import java.util.Optional; +import java.util.Arrays; public class BytesMessageImpl implements Message { private Header sysHeaders; + private ExtensionHeader extensionHeader; + private MessageReceipt messageReceipt; private KeyValue userProperties; private byte[] data; public BytesMessageImpl() { this.sysHeaders = new MessageHeader(); this.userProperties = OMS.newKeyValue(); + this.extensionHeader = new MessageExtensionHeader(); + this.messageReceipt = new DefaultMessageReceipt(); } @Override @@ -41,8 +45,8 @@ public class BytesMessageImpl implements Message { } @Override - public Optional extensionHeader() { - return null; + public ExtensionHeader extensionHeader() { + return extensionHeader; } @Override @@ -62,6 +66,16 @@ public class BytesMessageImpl implements Message { @Override public MessageReceipt getMessageReceipt() { - return null; + return messageReceipt; + } + + @Override public String toString() { + return "BytesMessageImpl{" + + "sysHeaders=" + sysHeaders + + ", extensionHeader=" + extensionHeader + + ", messageReceipt=" + messageReceipt + + ", userProperties=" + userProperties + + ", data=" + Arrays.toString(data) + + '}'; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java new file mode 100644 index 00000000..b3baeb2a --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.message.Message; +import io.openmessaging.message.MessageFactory; + +public class DefaultMessageFactory implements MessageFactory { + @Override public Message createMessage(String queueName, byte[] body) { + Message message = new BytesMessageImpl(); + message.setData(body); + message.header().setDestination(queueName); + return message; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java new file mode 100644 index 00000000..93390068 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultMessageReceipt.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.consumer.MessageReceipt; +import java.util.Objects; + +public class DefaultMessageReceipt implements MessageReceipt { + + private long offset; + + private String messageId; + + public DefaultMessageReceipt() { + + } + + public DefaultMessageReceipt(String messageId, long offset) { + this.messageId = messageId; + this.offset = offset; + } + + public void setOffset(long offset) { + this.offset = offset; + } + + public long getOffset() { + return offset; + } + + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + public String getMessageId() { + return messageId; + } + + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + DefaultMessageReceipt receipt = (DefaultMessageReceipt) o; + return offset == receipt.offset && + Objects.equals(messageId, receipt.messageId); + } + + @Override public int hashCode() { + return Objects.hash(offset, messageId); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java new file mode 100644 index 00000000..2958f963 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/DefaultQueueMetaData.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.extension.QueueMetaData; +import java.util.Objects; + +public class DefaultQueueMetaData implements QueueMetaData { + + private String queueName; + + private int partitionId; + + public DefaultQueueMetaData(String queueName, int partitionId) { + this.queueName = queueName; + this.partitionId = partitionId; + } + + @Override public void setQueueName(String queueNaome) { + this.queueName = queueName; + } + + @Override public void setPartitionId(int partitionId) { + this.partitionId = partitionId; + } + + @Override public int partitionId() { + return partitionId; + } + + @Override + public String queueName() { + return queueName; + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + DefaultQueueMetaData data = (DefaultQueueMetaData) o; + return partitionId == data.partitionId && + queueName.equals(data.queueName); + } + + @Override public int hashCode() { + return Objects.hash(queueName, partitionId); + } + + @Override public String toString() { + return "DefaultQueueMetaData{" + + "queueName='" + queueName + '\'' + + ", partitionId=" + partitionId + + '}'; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java new file mode 100644 index 00000000..74498273 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtension.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.extension.Extension; +import io.openmessaging.extension.QueueMetaData; +import java.util.Set; + +public class MessageExtension implements Extension { + + private Extension extension; + + public MessageExtension(Extension extension) { + this.extension = extension; + } + + @Override public Set getQueueMetaData(String queueName) { + return extension.getQueueMetaData(queueName); + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java new file mode 100644 index 00000000..3f103a4e --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageExtensionHeader.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import io.openmessaging.extension.ExtensionHeader; + +public class MessageExtensionHeader implements ExtensionHeader { + private int partition; + + private long offset; + + private String correlationId; + + private String transactionId; + + private long storeTimestamp; + + private String storeHost; + + private String messageKey; + + private String traceId; + + private long delayTime; + + private long expireTime; + + @Override public ExtensionHeader setPartition(int partition) { + this.partition = partition; + return this; + } + + @Override public ExtensionHeader setOffset(long offset) { + this.offset = offset; + return this; + } + + @Override public ExtensionHeader setCorrelationId(String correlationId) { + this.correlationId = correlationId; + return this; + } + + @Override public ExtensionHeader setTransactionId(String transactionId) { + this.transactionId = transactionId; + return this; + } + + @Override public ExtensionHeader setStoreTimestamp(long storeTimestamp) { + this.storeTimestamp = storeTimestamp; + return this; + } + + @Override public ExtensionHeader setStoreHost(String storeHost) { + this.storeHost = storeHost; + return this; + } + + @Override public ExtensionHeader setMessageKey(String messageKey) { + this.messageKey = messageKey; + return this; + } + + @Override public ExtensionHeader setTraceId(String traceId) { + this.traceId = traceId; + return this; + } + + @Override public ExtensionHeader setDelayTime(long delayTime) { + this.delayTime = delayTime; + return this; + } + + @Override public ExtensionHeader setExpireTime(long expireTime) { + this.expireTime = expireTime; + return this; + } + + @Override public int getPartiton() { + return partition; + } + + @Override public long getOffset() { + return offset; + } + + @Override public String getCorrelationId() { + return correlationId; + } + + @Override public String getTransactionId() { + return transactionId; + } + + @Override public long getStoreTimestamp() { + return storeTimestamp; + } + + @Override public String getStoreHost() { + return storeHost; + } + + @Override public long getDelayTime() { + return delayTime; + } + + @Override public long getExpireTime() { + return expireTime; + } + + @Override public String getMessageKey() { + return messageKey; + } + + @Override public String getTraceId() { + return traceId; + } + + @Override public String toString() { + return "MessageExtensionHeader{" + + "partition=" + partition + + ", offset=" + offset + + ", correlationId='" + correlationId + '\'' + + ", transactionId='" + transactionId + '\'' + + ", storeTimestamp=" + storeTimestamp + + ", storeHost='" + storeHost + '\'' + + ", messageKey='" + messageKey + '\'' + + ", traceId='" + traceId + '\'' + + ", delayTime=" + delayTime + + ", expireTime=" + expireTime + + '}'; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java index a6e7585c..495dc1ad 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/MessageHeader.java @@ -110,4 +110,17 @@ public class MessageHeader implements Header { @Override public short getCompression() { return this.compression; } + + @Override public String toString() { + return "MessageHeader{" + + "destination='" + destination + '\'' + + ", messageId='" + messageId + '\'' + + ", bornTimestamp=" + bornTimestamp + + ", bornHost='" + bornHost + '\'' + + ", priority=" + priority + + ", deliveryCount=" + deliveryCount + + ", compression=" + compression + + ", durability=" + durability + + '}'; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java index 16ecb0dd..2d0d3e60 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -30,7 +30,5 @@ public interface NonStandardKeys { String PRODUCER_ID = "PRODUCER_ID"; String CONSUMER_ID = "CONSUMER_ID"; String TIMEOUT = "TIMEOUT"; - String PULL_CONSUMER = "PULL"; - String PUSH_CONSUMER = "PUSH"; - + int PULL_MIN_NUMS = 1; } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java index 63034e39..735bace6 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/AbstractOMSProducer.java @@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.protocol.LanguageCode; -import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName; +import static io.openmessaging.rocketmq.utils.OMSClientUtil.buildInstanceName; abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory { final static InternalLogger log = ClientLogger.getLog(); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java index d3acce22..a44a0f74 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/producer/ProducerImpl.java @@ -18,31 +18,38 @@ package io.openmessaging.rocketmq.producer; import io.openmessaging.Future; import io.openmessaging.KeyValue; +import io.openmessaging.Promise; import io.openmessaging.ServiceLifeState; import io.openmessaging.exception.OMSMessageFormatException; +import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.extension.Extension; import io.openmessaging.extension.QueueMetaData; -import io.openmessaging.message.Message; -import io.openmessaging.Promise; -import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.interceptor.ProducerInterceptor; +import io.openmessaging.message.Message; import io.openmessaging.producer.Producer; import io.openmessaging.producer.SendResult; import io.openmessaging.producer.TransactionalResult; import io.openmessaging.rocketmq.domain.BytesMessageImpl; +import io.openmessaging.rocketmq.domain.MessageExtension; import io.openmessaging.rocketmq.promise.DefaultPromise; -import io.openmessaging.rocketmq.utils.OMSUtil; +import io.openmessaging.rocketmq.utils.OMSClientUtil; import java.util.List; import java.util.Optional; +import java.util.Set; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.common.message.MessageQueue; -import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert; +import static io.openmessaging.rocketmq.utils.OMSClientUtil.msgConvert; public class ProducerImpl extends AbstractOMSProducer implements Producer { + private final Extension extension; + public ProducerImpl(final KeyValue properties) { super(properties); + extension = new MessageExtension(this); } @Override @@ -60,7 +67,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed."); } message.header().setMessageId(rmqResult.getMsgId()); - return OMSUtil.sendResultConvert(rmqResult); + return OMSClientUtil.sendResultConvert(rmqResult); } catch (Exception e) { log.error(String.format("Send message to RocketMQ failed, %s", message), e); throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e); @@ -81,7 +88,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) { message.header().setMessageId(rmqResult.getMsgId()); - promise.set(OMSUtil.sendResultConvert(rmqResult)); + promise.set(OMSClientUtil.sendResultConvert(rmqResult)); } @Override @@ -112,7 +119,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { } for (Message message : messages) { - sendOneway(messages); + sendOneway(message); } } @@ -128,7 +135,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { } for (Message message : messages) { - sendOneway(messages); + sendOneway(message); } } @@ -152,12 +159,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer { @Override public Optional getExtension() { - return null; + return Optional.of(extension); } @Override - public QueueMetaData getQueueMetaData(String queueName) { - return null; + public Set getQueueMetaData(String queueName) { + List messageQueues; + try { + messageQueues = this.rocketmqProducer.fetchPublishMessageQueues(queueName); + } catch (MQClientException e) { + log.error("A error occurred when get queue metadata.", e); + return null; + } + return OMSClientUtil.queueMetaDataConvert(messageQueues); } @Override diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java similarity index 77% rename from openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java rename to openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java index 5b095eee..ab3ff34b 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/utils/OMSClientUtil.java @@ -18,19 +18,24 @@ package io.openmessaging.rocketmq.utils; import io.openmessaging.KeyValue; import io.openmessaging.OMS; +import io.openmessaging.extension.QueueMetaData; import io.openmessaging.message.Header; import io.openmessaging.producer.SendResult; +import io.openmessaging.rocketmq.domain.DefaultQueueMetaData; import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.RocketMQConstants; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; +import java.util.Collection; +import java.util.HashSet; import java.util.Map; import java.util.Set; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageAccessor; +import org.apache.rocketmq.common.message.MessageQueue; -public class OMSUtil { +public class OMSClientUtil { /** * Builds a OMS client instance name. @@ -56,7 +61,6 @@ public class OMSUtil { rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime)); } - for (String key : userHeaders.keySet()) { MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key)); } @@ -82,6 +86,13 @@ public class OMSUtil { omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost())); omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp()); omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel()); + omsMsg.extensionHeader().setPartition(rmqMsg.getQueueId()); + omsMsg.extensionHeader().setOffset(rmqMsg.getQueueOffset()); + omsMsg.extensionHeader().setDelayTime(rmqMsg.getDelayTimeLevel()); + omsMsg.extensionHeader().setMessageKey(rmqMsg.getKeys()); + omsMsg.extensionHeader().setStoreHost(rmqMsg.getStoreHost().toString()); + omsMsg.extensionHeader().setStoreTimestamp(rmqMsg.getStoreTimestamp()); + omsMsg.extensionHeader().setTransactionId(rmqMsg.getTransactionId()); return omsMsg; } @@ -116,4 +127,17 @@ public class OMSUtil { } return keyValue; } + + public static Set queueMetaDataConvert(Collection messageQueues) { + Set queueMetaDatas = new HashSet<>(32); + if (null != messageQueues && !messageQueues.isEmpty()) { + for (MessageQueue messageQueue : messageQueues) { + QueueMetaData queueMetaData = new DefaultQueueMetaData(messageQueue.getTopic(), messageQueue.getQueueId()); + queueMetaDatas.add(queueMetaData); + } + } else { + return null; + } + return queueMetaDatas; + } } diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java index 851c283d..2a270763 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/LocalMessageCacheTest.java @@ -16,10 +16,17 @@ */ package io.openmessaging.rocketmq.consumer; +import io.openmessaging.KeyValue; +import io.openmessaging.extension.QueueMetaData; +import io.openmessaging.internal.DefaultKeyValue; import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; import org.junit.Before; @@ -40,6 +47,8 @@ public class LocalMessageCacheTest { private DefaultMQPullConsumer rocketmqPullConsume; @Mock private ConsumeRequest consumeRequest; + @Mock + private ConsumeRequest consumeRequest1; @Before public void init() { @@ -86,4 +95,85 @@ public class LocalMessageCacheTest { localMessageCache.submitConsumeRequest(consumeRequest); assertThat(localMessageCache.poll()).isEqualTo(consumedMsg); } + + @Test + public void testBatchPollMessage() throws Exception { + byte[] body = new byte[] {'1', '2', '3'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(body); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + + byte[] body1 = new byte[] {'4', '5', '6'}; + MessageExt consumedMsg1 = new MessageExt(); + consumedMsg1.setMsgId("NewMsgId1"); + consumedMsg1.setBody(body1); + consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg1.setTopic("HELLO_QUEUE1"); + + when(consumeRequest.getMessageExt()).thenReturn(consumedMsg); + when(consumeRequest1.getMessageExt()).thenReturn(consumedMsg1); + localMessageCache.submitConsumeRequest(consumeRequest); + localMessageCache.submitConsumeRequest(consumeRequest1); + KeyValue properties = new DefaultKeyValue(); + properties.put(NonStandardKeys.TIMEOUT, 3000); + List messageExts = localMessageCache.batchPoll(properties); + assertThat(messageExts.size()).isEqualTo(2); + MessageExt messageExt1 = null; + MessageExt messageExt2 = null; + for (MessageExt messageExt : messageExts) { + if (messageExt.getMsgId().equals("NewMsgId")) { + messageExt1 = messageExt; + } + if (messageExt.getMsgId().equals("NewMsgId1")) { + messageExt2 = messageExt; + } + } + assertThat(messageExt1).isNotNull(); + assertThat(messageExt1.getBody()).isEqualTo(body); + assertThat(messageExt1.getTopic()).isEqualTo("HELLO_QUEUE"); + assertThat(messageExt2).isNotNull(); + assertThat(messageExt2.getBody()).isEqualTo(body1); + assertThat(messageExt2.getTopic()).isEqualTo("HELLO_QUEUE1"); + + } + + @Test + public void getQueueMetaData() throws MQClientException { + MessageQueue messageQueue1 = new MessageQueue("topic1", "brockerName1", 0); + MessageQueue messageQueue2 = new MessageQueue("topic1", "brockerName2", 1); + MessageQueue messageQueue3 = new MessageQueue("topic1", "brockerName3", 2); + Set messageQueues = new HashSet() { + { + add(messageQueue1); + add(messageQueue2); + add(messageQueue3); + } + }; + + when(rocketmqPullConsume.fetchSubscribeMessageQueues("topic1")).thenReturn(messageQueues); + Set queueMetaDatas = localMessageCache.getQueueMetaData("topic1"); + assertThat(queueMetaDatas.size()).isEqualTo(3); + QueueMetaData queueMetaData1 = null; + QueueMetaData queueMetaData2 = null; + QueueMetaData queueMetaData3 = null; + for (QueueMetaData queueMetaData : queueMetaDatas) { + if (queueMetaData.partitionId() == 0) { + queueMetaData1 = queueMetaData; + } + if (queueMetaData.partitionId() == 1) { + queueMetaData2 = queueMetaData; + } + if (queueMetaData.partitionId() == 2) { + queueMetaData3 = queueMetaData; + } + } + assertThat(queueMetaData1).isNotNull(); + assertThat(queueMetaData1.queueName()).isEqualTo("topic1"); + assertThat(queueMetaData2).isNotNull(); + assertThat(queueMetaData2.queueName()).isEqualTo("topic1"); + assertThat(queueMetaData3).isNotNull(); + assertThat(queueMetaData3.queueName()).isEqualTo("topic1"); + } } \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java index 7cb50309..1f61b340 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PullConsumerImplTest.java @@ -19,14 +19,28 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.OMS; -import io.openmessaging.consumer.Consumer; -import io.openmessaging.manager.ResourceManager; +import io.openmessaging.consumer.MessageReceipt; +import io.openmessaging.consumer.PullConsumer; +import io.openmessaging.extension.QueueMetaData; import io.openmessaging.message.Message; import io.openmessaging.rocketmq.config.ClientConfig; +import io.openmessaging.rocketmq.domain.DefaultMessageReceipt; +import io.openmessaging.rocketmq.domain.DefaultQueueMetaData; import io.openmessaging.rocketmq.domain.NonStandardKeys; import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -35,12 +49,15 @@ import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; @RunWith(MockitoJUnitRunner.class) public class PullConsumerImplTest { - private Consumer consumer; + private PullConsumer pullConsumer; private String queueName = "HELLO_QUEUE"; @Mock @@ -50,15 +67,17 @@ public class PullConsumerImplTest { @Before public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS - .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup"); - consumer = messagingAccessPoint.createConsumer(); - consumer.bindQueue(queueName); + .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); + final KeyValue attributes = messagingAccessPoint.attributes(); + attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup"); + pullConsumer = messagingAccessPoint.createPullConsumer(); + Set queueNames = new HashSet<>(8); + queueNames.add(queueName); + pullConsumer.bindQueue(queueNames); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); field.setAccessible(true); - field.set(consumer, rocketmqPullConsumer); //Replace + field.set(pullConsumer, rocketmqPullConsumer); //Replace ClientConfig clientConfig = new ClientConfig(); clientConfig.setOperationTimeout(200); @@ -66,27 +85,133 @@ public class PullConsumerImplTest { field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); field.setAccessible(true); - field.set(consumer, localMessageCache); - consumer.start(); + field.set(pullConsumer, localMessageCache); + pullConsumer.start(); } @Test - public void testPoll() { - final byte[] testBody = new byte[]{'a', 'b'}; + public void testPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + final byte[] testBody = new byte[] {'a', 'b'}; MessageExt consumedMsg = new MessageExt(); consumedMsg.setMsgId("NewMsgId"); consumedMsg.setBody(testBody); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.setTopic(queueName); + consumedMsg.setQueueId(0); + consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class)); - Message message = consumer.receive(3 * 1000); + Message message = pullConsumer.receive(3 * 1000); assertThat(message.header().getMessageId()).isEqualTo("NewMsgId"); assertThat(message.getData()).isEqualTo(testBody); + + List messageExts = new ArrayList() { + { + add(consumedMsg); + } + }; + PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts); + doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong()); + MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0); + Set messageQueues = new HashSet() { + { + add(messageQueue); + } + }; + doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName); + QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0); + MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L); + long timeout = 3000L; + Message message1 = pullConsumer.receive(queueName, queueMetaData, messageReceipt, timeout); + assertThat(message1.header().getMessageId()).isEqualTo("NewMsgId"); + assertThat(message1.getData()).isEqualTo(testBody); + assertThat(message1.header().getDestination()).isEqualTo(queueName); + assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0); + } + + @Test + public void testBatchPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException { + final byte[] testBody = new byte[] {'a', 'b'}; + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic(queueName); + consumedMsg.setQueueId(0); + consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); + final byte[] testBody1 = new byte[] {'c', 'd'}; + MessageExt consumedMsg1 = new MessageExt(); + consumedMsg1.setMsgId("NewMsgId1"); + consumedMsg1.setBody(testBody1); + consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg1.setTopic(queueName); + consumedMsg1.setQueueId(0); + consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); + List messageExts = new ArrayList() { + { + add(consumedMsg); + add(consumedMsg1); + } + }; + doReturn(messageExts).when(localMessageCache).batchPoll(any(KeyValue.class)); + List messages = pullConsumer.batchReceive(3 * 1000); + + Message message1 = null; + Message message2 = null; + assertThat(messages.size()).isEqualTo(2); + for (Message message : messages) { + if (message.header().getMessageId().equals("NewMsgId")) { + message1 = message; + } + if (message.header().getMessageId().equals("NewMsgId1")) { + message2 = message; + } + } + assertThat(message1).isNotNull(); + assertThat(message1.getData()).isEqualTo(testBody); + assertThat(message1.header().getDestination()).isEqualTo(queueName); + assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0); + assertThat(message2).isNotNull(); + assertThat(message2.getData()).isEqualTo(testBody1); + assertThat(message2.header().getDestination()).isEqualTo(queueName); + assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0); + + PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts); + doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong()); + MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0); + Set messageQueues = new HashSet() { + { + add(messageQueue); + } + }; + doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName); + QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0); + MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L); + long timeout = 3000L; + List message1s = pullConsumer.batchReceive(queueName, queueMetaData, messageReceipt, timeout); + assertThat(message1s.size()).isEqualTo(2); + Message message3 = null; + Message message4 = null; + for (Message message : message1s) { + if (message.header().getMessageId().equals("NewMsgId")) { + message3 = message; + } + if (message.header().getMessageId().equals("NewMsgId1")) { + message4 = message; + } + } + assertThat(message3).isNotNull(); + assertThat(message3.getData()).isEqualTo(testBody); + assertThat(message3.header().getDestination()).isEqualTo(queueName); + assertThat(message3.extensionHeader().getPartiton()).isEqualTo(0); + assertThat(message4).isNotNull(); + assertThat(message4.getData()).isEqualTo(testBody1); + assertThat(message4.header().getDestination()).isEqualTo(queueName); + assertThat(message4.extensionHeader().getPartiton()).isEqualTo(0); } @Test public void testPoll_WithTimeout() { - Message message = consumer.receive(3 * 1000); + Message message = pullConsumer.receive(3 * 1000); assertThat(message).isNull(); } } \ No newline at end of file diff --git a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java index 51167e81..39af3c1a 100644 --- a/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java +++ b/openmessaging/src/test/java/io/openmessaging/rocketmq/consumer/PushConsumerImplTest.java @@ -16,12 +16,21 @@ */ package io.openmessaging.rocketmq.consumer; -import io.openmessaging.*; -import io.openmessaging.consumer.Consumer; +import io.openmessaging.KeyValue; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.OMS; +import io.openmessaging.consumer.BatchMessageListener; import io.openmessaging.consumer.MessageListener; -import io.openmessaging.manager.ResourceManager; +import io.openmessaging.consumer.PushConsumer; import io.openmessaging.message.Message; import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; @@ -31,15 +40,12 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; -import java.lang.reflect.Field; -import java.util.Collections; - -import static org.mockito.Mockito.when; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class PushConsumerImplTest { - private Consumer consumer; + private PushConsumer pushConsumer; @Mock private DefaultMQPushConsumer rocketmqPushConsumer; @@ -48,17 +54,17 @@ public class PushConsumerImplTest { public void init() throws NoSuchFieldException, IllegalAccessException { final MessagingAccessPoint messagingAccessPoint = OMS .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); - final ResourceManager resourceManager = messagingAccessPoint.resourceManager(); - resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup"); - consumer = messagingAccessPoint.createConsumer(); + final KeyValue attributes = messagingAccessPoint.attributes(); + attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup"); + pushConsumer = messagingAccessPoint.createPushConsumer(); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); field.setAccessible(true); - DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer); - field.set(consumer, rocketmqPushConsumer); //Replace + DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(pushConsumer); + field.set(pushConsumer, rocketmqPushConsumer); //Replace when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); - consumer.start(); + pushConsumer.start(); } @Test @@ -70,7 +76,11 @@ public class PushConsumerImplTest { consumedMsg.setBody(testBody); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.setTopic("HELLO_QUEUE"); - consumer.bindQueue("HELLO_QUEUE", new MessageListener() { + consumedMsg.setQueueId(0); + consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); + Set queueNames = new HashSet<>(8); + queueNames.add("HELLO_QUEUE"); + pushConsumer.bindQueue(queueNames, new MessageListener() { @Override public void onReceived(Message message, Context context) { assertThat(message.header().getMessageId()).isEqualTo("NewMsgId"); @@ -81,4 +91,65 @@ public class PushConsumerImplTest { ((MessageListenerConcurrently) rocketmqPushConsumer .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); } + + @Test + public void testBatchConsumeMessage() { + final byte[] testBody = new byte[]{'a', 'b'}; + + MessageExt consumedMsg = new MessageExt(); + consumedMsg.setMsgId("NewMsgId"); + consumedMsg.setBody(testBody); + consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg.setTopic("HELLO_QUEUE"); + consumedMsg.setQueueId(0); + consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); + + final byte[] testBody1 = new byte[]{'c', 'd'}; + MessageExt consumedMsg1 = new MessageExt(); + consumedMsg1.setMsgId("NewMsgId1"); + consumedMsg1.setBody(testBody1); + consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); + consumedMsg1.setTopic("HELLO_QUEUE"); + consumedMsg1.setQueueId(0); + consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876)); + List messageExts = new ArrayList() { + { + add(consumedMsg); + add(consumedMsg1); + } + }; + + Set queueNames = new HashSet<>(8); + queueNames.add("HELLO_QUEUE"); + pushConsumer.bindQueue(queueNames, new BatchMessageListener() { + @Override public void onReceived(List batchMessage, Context context) { + assertThat(batchMessage.size()).isEqualTo(2); + Message message1 = null; + Message message2 = null; + for (Message message : batchMessage) { + if (message.header().getMessageId().equals("NewMsgId")) { + message1 = message; + } + if (message.header().getMessageId().equals("NewMsgId1")) { + message2 = message; + } + } + assertThat(message1).isNotNull(); + assertThat(message1.getData()).isEqualTo(testBody); + assertThat(message1.header().getDestination()).isEqualTo("HELLO_QUEUE"); + assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0); + assertThat(message2).isNotNull(); + assertThat(message2.getData()).isEqualTo(testBody1); + assertThat(message2.header().getDestination()).isEqualTo("HELLO_QUEUE"); + assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0); + + context.ack(); + } + }); + + ((MessageListenerConcurrently) rocketmqPushConsumer + .getMessageListener()).consumeMessage(messageExts, null); + + } + } \ No newline at end of file -- GitLab