提交 246e34eb 编写于 作者: Z zhoubo 提交者: Hu Zongtang

[ISSUE #1199] Implement the 1.0.0 openmessaging new consumer API for rocketmq oms module (#1240)

* Adapt to the new consumer api

* New consumer api implements code optimization

* Adjust consumer example

* 1、Optimize consumer code implementation
2、Fix bug

* 1、Optimize consumer code implementation
2、Fix bug

* Add new api unit test

* Rename OMSUtil to OMSClientUtil

* Unit test adds more verification
上级 82a6b1e8
......@@ -16,40 +16,44 @@
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.HashSet;
import java.util.Set;
public class SimplePullConsumer {
public static void main(String[] args) {
final MessagingAccessPoint messagingAccessPoint =
OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
messagingAccessPoint.startup();
final Producer producer = messagingAccessPoint.createProducer();
final PullConsumer consumer = messagingAccessPoint.createPullConsumer(
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
final String queueName = "TopicTest";
producer.startup();
Message msg = producer.createBytesMessage(queueName, "Hello Open Messaging".getBytes());
final String queueName = "OMS_HELLO_TOPIC";
producer.start();
Message msg = producer.createMessage(queueName, "Hello Open Messaging".getBytes());
SendResult sendResult = producer.send(msg);
System.out.printf("Send Message OK. MsgId: %s%n", sendResult.messageId());
producer.shutdown();
producer.stop();
consumer.attachQueue(queueName);
Set<String> queueNames = new HashSet<String>(8) {
{
add(queueName);
}
};
consumer.bindQueue(queueNames);
consumer.startup();
consumer.start();
System.out.printf("Consumer startup OK%n");
// Keep running until we find the one that has just been sent
......@@ -57,9 +61,11 @@ public class SimplePullConsumer {
while (!stop) {
Message message = consumer.receive();
if (message != null) {
String msgId = message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID);
String msgId = message.header().getMessageId();
System.out.printf("Received one message: %s%n", msgId);
consumer.ack(msgId);
DefaultMessageReceipt defaultMessageReceipt = new DefaultMessageReceipt();
defaultMessageReceipt.setMessageId(msgId);
consumer.ack(defaultMessageReceipt);
if (!stop) {
stop = msgId.equalsIgnoreCase(sendResult.messageId());
......@@ -70,7 +76,6 @@ public class SimplePullConsumer {
}
}
consumer.shutdown();
messagingAccessPoint.shutdown();
consumer.stop();
}
}
......@@ -16,12 +16,14 @@
*/
package org.apache.rocketmq.example.openmessaging;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.HashSet;
import java.util.Set;
public class SimplePushConsumer {
public static void main(String[] args) {
......@@ -29,28 +31,29 @@ public class SimplePushConsumer {
.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default");
final PushConsumer consumer = messagingAccessPoint.
createPushConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "OMS_CONSUMER"));
messagingAccessPoint.startup();
System.out.printf("MessagingAccessPoint startup OK%n");
createPushConsumer(OMS.newKeyValue().put(NonStandardKeys.CONSUMER_ID, "OMS_CONSUMER"));
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
@Override
public void run() {
consumer.shutdown();
messagingAccessPoint.shutdown();
consumer.stop();
}
}));
consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() {
Set<String> queueNames = new HashSet<String>(8) {
{
add("OMS_HELLO_TOPIC");
}
};
consumer.bindQueue(queueNames, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
System.out.printf("Received one message: %s%n", message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID));
System.out.printf("Received one message: %s%n", message.header().getMessageId());
context.ack();
}
});
consumer.startup();
consumer.start();
System.out.printf("Consumer startup OK%n");
}
}
......@@ -18,24 +18,26 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.domain.DefaultMessageFactory;
import io.openmessaging.rocketmq.producer.ProducerImpl;
import java.util.HashSet;
import java.util.Set;
public class MessagingAccessPointImpl implements MessagingAccessPoint {
private final KeyValue accessPointProperties;
private final MessageFactory messageFactory;
public MessagingAccessPointImpl(final KeyValue accessPointProperties) {
this.accessPointProperties = accessPointProperties;
this.messageFactory = new DefaultMessageFactory();
}
@Override
......@@ -57,77 +59,34 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
return null;
}
@Override public Consumer createConsumer() {
String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
String[] nsStrArr = consumerId.split("_");
if (nsStrArr.length < 2) {
return new PushConsumerImpl(accessPointProperties);
}
if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
return new PullConsumerImpl(accessPointProperties);
}
@Override public PushConsumer createPushConsumer() {
return new PushConsumerImpl(accessPointProperties);
}
@Override
public ResourceManager resourceManager() {
DefaultResourceManager resourceManager = new DefaultResourceManager();
return resourceManager;
}
@Override public MessageFactory messageFactory() {
return null;
@Override public PullConsumer createPullConsumer() {
return new PullConsumerImpl(accessPointProperties);
}
class DefaultResourceManager implements ResourceManager {
@Override
public void createNamespace(String nsName) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
}
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace);
@Override public PushConsumer createPushConsumer(KeyValue attributes) {
for (String key : attributes.keySet()) {
accessPointProperties.put(key, attributes.getString(key));
}
return new PushConsumerImpl(accessPointProperties);
}
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
}
};
}
@Override
public void createQueue(String queueName) {
}
@Override
public void deleteQueue(String queueName) {
}
@Override
public Set<String> listQueues(String nsName) {
return null;
}
@Override
public void filter(String queueName, String filterString) {
@Override public PullConsumer createPullConsumer(KeyValue attributes) {
for (String key : attributes.keySet()) {
accessPointProperties.put(key, attributes.getString(key));
}
return new PullConsumerImpl(accessPointProperties);
}
@Override
public void routing(String sourceQueue, String targetQueue) {
@Override
public ResourceManager resourceManager() {
return null;
}
}
};
@Override public MessageFactory messageFactory() {
return messageFactory;
}
}
......@@ -21,9 +21,9 @@ import io.openmessaging.ServiceLifeState;
import io.openmessaging.ServiceLifecycle;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
......@@ -36,6 +36,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
......@@ -67,7 +68,7 @@ class LocalMessageCache implements ServiceLifecycle {
this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"OMS_CleanExpireMsgScheduledThread_"));
"OMS_CleanExpireMsgScheduledThread_"));
this.currentState = ServiceLifeState.INITIALIZED;
}
......@@ -79,7 +80,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!pullOffsetTable.containsKey(remoteQueue)) {
try {
pullOffsetTable.putIfAbsent(remoteQueue,
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e);
}
......@@ -125,19 +126,28 @@ class LocalMessageCache implements ServiceLifecycle {
return null;
}
List<MessageExt> batchPoll(final KeyValue properties) {
List<ConsumeRequest> consumeRequests = new ArrayList<>(16);
int n = consumeRequestCache.drainTo(consumeRequests);
if (n > 0) {
List<MessageExt> messageExts = new ArrayList<>(n);
for (ConsumeRequest consumeRequest : consumeRequests) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
consumedRequest.put(messageExt.getMsgId(), consumeRequest);
messageExts.add(messageExt);
List<MessageExt> batchPoll(KeyValue properties) {
List<ConsumeRequest> consumeRequests = new ArrayList<>(clientConfig.getRmqPullMessageBatchNums());
long timeout = properties.getLong(NonStandardKeys.TIMEOUT);
while (timeout >= 0) {
int n = consumeRequestCache.drainTo(consumeRequests, clientConfig.getRmqPullMessageBatchNums());
if (n > 0) {
List<MessageExt> messageExts = new ArrayList<>(n);
for (ConsumeRequest consumeRequest : consumeRequests) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
consumedRequest.put(messageExt.getMsgId(), consumeRequest);
messageExts.add(messageExt);
}
return messageExts;
}
if (timeout > 0) {
LockSupport.parkNanos(timeout * 1000 * 1000);
timeout = 0;
} else {
timeout = -1;
}
return messageExts;
}
return null;
}
......@@ -166,7 +176,7 @@ class LocalMessageCache implements ServiceLifecycle {
private void cleanExpireMsg() {
for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
.getRebalanceImpl().getProcessQueueTable().entrySet()) {
.getRebalanceImpl().getProcessQueueTable().entrySet()) {
ProcessQueue pq = next.getValue();
MessageQueue mq = next.getKey();
ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
......@@ -186,7 +196,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it.
} else {
break;
......@@ -204,7 +214,7 @@ class LocalMessageCache implements ServiceLifecycle {
try {
rocketmqPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
ack(mq, pq, msg);
} catch (Exception e) {
log.error("Send back expired msg exception", e);
......@@ -237,7 +247,7 @@ class LocalMessageCache implements ServiceLifecycle {
public void stop() {
this.currentState = ServiceLifeState.STOPPING;
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
this.currentState = ServiceLifeState.STARTED;
this.currentState = ServiceLifeState.STOPPED;
}
@Override
......@@ -246,7 +256,7 @@ class LocalMessageCache implements ServiceLifecycle {
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
public Set<QueueMetaData> getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
......@@ -254,16 +264,6 @@ class LocalMessageCache implements ServiceLifecycle {
log.error("A error occurred when get queue metadata.", e);
return null;
}
List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
if (null != messageQueues && !messageQueues.isEmpty()) {
for (MessageQueue messageQueue : messageQueues) {
QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
partitions.add(partition);
}
} else {
return null;
}
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
return queueMetaData;
return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
}
......@@ -18,10 +18,8 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
......@@ -31,10 +29,13 @@ import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
......@@ -55,10 +56,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PullConsumerImpl implements Consumer {
private static final int PULL_MAX_NUMS = 32;
private static final int PULL_MIN_NUMS = 1;
public class PullConsumerImpl implements PullConsumer {
private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties;
......@@ -67,7 +65,8 @@ public class PullConsumerImpl implements Consumer {
private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig;
private ServiceLifeState currentState;
private List<ConsumerInterceptor> consumerInterceptors;
private final List<ConsumerInterceptor> consumerInterceptors;
private final Extension extension;
private final static InternalLogger log = ClientLogger.getLog();
......@@ -96,15 +95,15 @@ public class PullConsumerImpl implements Consumer {
int maxReDeliveryTimes = clientConfig.getRmqMaxRedeliveryTimes();
this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes);
String consumerId = OMSUtil.buildInstanceName();
String consumerId = OMSClientUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
consumerInterceptors = new ArrayList<>(16);
this.extension = new MessageExtension(this);
}
private void registerPullTaskCallback(final String targetQueueName) {
......@@ -139,96 +138,36 @@ public class PullConsumerImpl implements Consumer {
});
}
@Override
public void resume() {
currentState = ServiceLifeState.STARTED;
}
@Override
public void suspend() {
currentState = ServiceLifeState.STOPPED;
@Override public Set<String> getBindQueues() {
return rocketmqPullConsumer.getRegisterTopics();
}
@Override
public void suspend(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public boolean isSuspended() {
if (ServiceLifeState.STOPPED.equals(currentState)) {
return true;
}
return false;
public void addInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.add(interceptor);
}
@Override
public void bindQueue(String queueName) {
registerPullTaskCallback(queueName);
public void removeInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.remove(interceptor);
}
@Override
public void bindQueue(List<String> queueNames) {
@Override public void bindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
bindQueue(queueName);
registerPullTaskCallback(queueName);
}
}
@Override
public void bindQueue(String queueName, MessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueues(List<String> queueNames, MessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueue(String queueName, BatchMessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void unbindQueue(String queueName) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
}
@Override
public void unbindQueues(List<String> queueNames) {
@Override public void unbindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
}
}
@Override
public boolean isBindQueue() {
Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
if (null == registerTopics || registerTopics.isEmpty()) {
return false;
}
return true;
}
@Override
public List<String> getBindQueues() {
Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
return new ArrayList<>(registerTopics);
}
@Override
public void addInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.add(interceptor);
}
@Override
public void removeInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.remove(interceptor);
@Override public Message receive() {
KeyValue properties = new DefaultKeyValue();
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
}
@Override
......@@ -236,23 +175,24 @@ public class PullConsumerImpl implements Consumer {
KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, timeout);
MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
return rmqMsg == null ? null : OMSClientUtil.msgConvert(rmqMsg);
}
@Override
public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
MessageQueue mq = null;
mq = getQueue(queueName, partitionId, mq);
PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS);
if (pullResult == null)
public Message receive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt, long timeout) {
MessageQueue mq;
mq = getQueue(queueMetaData);
PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, NonStandardKeys.PULL_MIN_NUMS);
if (pullResult == null) {
return null;
}
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages.get(0);
......@@ -261,10 +201,10 @@ public class PullConsumerImpl implements Consumer {
return null;
}
private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) {
private PullResult getResult(long offset, long timeout, MessageQueue mq, int maxNums) {
PullResult pullResult;
try {
pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout);
pullResult = rocketmqPullConsumer.pull(mq, "*", offset, maxNums, timeout);
} catch (MQClientException e) {
log.error("A error occurred when pull message.", e);
return null;
......@@ -278,17 +218,15 @@ public class PullConsumerImpl implements Consumer {
log.error("A error occurred when pull message.", e);
return null;
}
if (null == pullResult) {
return null;
}
return pullResult;
}
private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) {
private MessageQueue getQueue(QueueMetaData queueMetaData) {
MessageQueue mq = null;
try {
Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueMetaData.queueName());
for (MessageQueue messageQueue : messageQueues) {
if (messageQueue.getQueueId() == partitionId) {
if (messageQueue.getQueueId() == queueMetaData.partitionId()) {
mq = messageQueue;
}
}
......@@ -306,7 +244,7 @@ public class PullConsumerImpl implements Consumer {
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
List<Message> messages = new ArrayList<>(rmqMsgs.size());
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
......@@ -315,19 +253,21 @@ public class PullConsumerImpl implements Consumer {
}
@Override
public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
MessageQueue mq = null;
mq = getQueue(queueName, partitionId, mq);
PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS);
if (pullResult == null)
public List<Message> batchReceive(String queueName, QueueMetaData queueMetaData, MessageReceipt messageReceipt,
long timeout) {
MessageQueue mq;
mq = getQueue(queueMetaData);
PullResult pullResult = getResult(((DefaultMessageReceipt) messageReceipt).getOffset(), timeout, mq, clientConfig.getRmqPullMessageBatchNums());
if (pullResult == null) {
return null;
}
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
BytesMessageImpl bytesMessage = OMSClientUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
......@@ -338,18 +278,12 @@ public class PullConsumerImpl implements Consumer {
@Override
public void ack(MessageReceipt receipt) {
localMessageCache.ack(((DefaultMessageReceipt) receipt).getMessageId());
}
@Override
public Optional<Extension> getExtension() {
return Optional.of(new Extension() {
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return getQueueMetaData(queueName);
}
});
return Optional.of(extension);
}
@Override
......@@ -381,7 +315,7 @@ public class PullConsumerImpl implements Consumer {
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
public Set<QueueMetaData> getQueueMetaData(String queueName) {
return localMessageCache.getQueueMetaData(queueName);
}
}
......@@ -20,27 +20,30 @@ import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
......@@ -48,12 +51,13 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PushConsumerImpl implements Consumer {
public class PushConsumerImpl implements PushConsumer {
private final static InternalLogger log = ClientLogger.getLog();
......@@ -65,6 +69,8 @@ public class PushConsumerImpl implements Consumer {
private final ClientConfig clientConfig;
private ServiceLifeState currentState;
private List<ConsumerInterceptor> consumerInterceptors;
private ScheduledExecutorService scheduledExecutorService;
private final Extension extension;
public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer();
......@@ -89,7 +95,7 @@ public class PushConsumerImpl implements Consumer {
this.rocketmqPushConsumer.setConsumeThreadMax(clientConfig.getRmqMaxConsumeThreadNums());
this.rocketmqPushConsumer.setConsumeThreadMin(clientConfig.getRmqMinConsumeThreadNums());
String consumerId = OMSUtil.buildInstanceName();
String consumerId = OMSClientUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
......@@ -97,6 +103,9 @@ public class PushConsumerImpl implements Consumer {
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
consumerInterceptors = new ArrayList<>(16);
scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
"OMS_SuspendTimeouThread_"));
extension = new MessageExtension(this);
currentState = ServiceLifeState.INITIALIZED;
}
......@@ -112,7 +121,12 @@ public class PushConsumerImpl implements Consumer {
@Override
public void suspend(long timeout) {
throw new UnsupportedOperationException();
this.rocketmqPushConsumer.suspend();
scheduledExecutorService.schedule(new Runnable() {
@Override public void run() {
PushConsumerImpl.this.rocketmqPushConsumer.resume();
}
}, timeout, TimeUnit.MILLISECONDS);
}
@Override
......@@ -120,90 +134,49 @@ public class PushConsumerImpl implements Consumer {
return this.rocketmqPushConsumer.getDefaultMQPushConsumerImpl().isPause();
}
@Override
public void bindQueue(String queueName) {
try {
rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
@Override
public void bindQueue(List<String> queueNames) {
@Override public void bindQueue(Collection<String> queueNames, MessageListener listener) {
for (String queueName : queueNames) {
bindQueue(queueName);
}
}
@Override
public void bindQueue(String queueName, MessageListener listener) {
this.subscribeTable.put(queueName, listener);
this.batchSubscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
@Override
public void bindQueues(List<String> queueNames, MessageListener listener) {
for (String queueName : queueNames) {
bindQueue(queueName, listener);
}
}
@Override
public void bindQueue(String queueName, BatchMessageListener listener) {
this.batchSubscribeTable.put(queueName, listener);
this.subscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
this.subscribeTable.put(queueName, listener);
this.batchSubscribeTable.remove(queueName);
this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(NonStandardKeys.PULL_MIN_NUMS);
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
}
@Override
public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
@Override public void bindQueue(Collection<String> queueNames, BatchMessageListener listener) {
for (String queueName : queueNames) {
bindQueue(queueName, listener);
}
}
@Override
public void unbindQueue(String queueName) {
this.subscribeTable.remove(queueName);
this.batchSubscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.unsubscribe(queueName);
} catch (Exception e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
this.batchSubscribeTable.put(queueName, listener);
this.subscribeTable.remove(queueName);
this.rocketmqPushConsumer.setConsumeMessageBatchMaxSize(clientConfig.getRmqPullMessageBatchNums());
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
}
@Override
public void unbindQueues(List<String> queueNames) {
@Override public void unbindQueue(Collection<String> queueNames) {
for (String queueName : queueNames) {
unbindQueue(queueName);
}
}
@Override
public boolean isBindQueue() {
Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
if (null != subscription && subscription.size() > 0) {
return true;
this.subscribeTable.remove(queueName);
this.batchSubscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.unsubscribe(queueName);
} catch (Exception e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
}
}
return false;
}
@Override
public List<String> getBindQueues() {
public Set<String> getBindQueues() {
Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
if (null != subscription && subscription.size() > 0) {
return new ArrayList<>(subscription.keySet());
return subscription.keySet();
}
return null;
}
......@@ -218,26 +191,6 @@ public class PushConsumerImpl implements Consumer {
consumerInterceptors.remove(interceptor);
}
@Override
public Message receive(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
throw new UnsupportedOperationException();
}
@Override
public List<Message> batchReceive(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
throw new UnsupportedOperationException();
}
@Override
public void ack(MessageReceipt receipt) {
throw new UnsupportedOperationException();
......@@ -245,13 +198,7 @@ public class PushConsumerImpl implements Consumer {
@Override
public Optional<Extension> getExtension() {
return Optional.of(new Extension() {
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return getQueueMetaData(queueName);
}
});
return Optional.of(extension);
}
@Override
......@@ -284,7 +231,7 @@ public class PushConsumerImpl implements Consumer {
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
public Set<QueueMetaData> getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPushConsumer.fetchSubscribeMessageQueues(queueName);
......@@ -292,24 +239,14 @@ public class PushConsumerImpl implements Consumer {
log.error("A error occurred when get queue metadata.", e);
return null;
}
List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
if (null != messageQueues && !messageQueues.isEmpty()) {
for (MessageQueue messageQueue : messageQueues) {
QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
partitions.add(partition);
}
} else {
return null;
}
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
return queueMetaData;
return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
class MessageListenerImpl implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
ConsumeConcurrentlyContext contextRMQ) {
ConsumeConcurrentlyContext contextRMQ) {
boolean batchFlag = true;
MessageExt rmqMsg = rmqMsgList.get(0);
BatchMessageListener batchMessageListener = PushConsumerImpl.this.batchSubscribeTable.get(rmqMsg.getTopic());
......@@ -319,14 +256,14 @@ public class PushConsumerImpl implements Consumer {
}
if (listener == null && batchMessageListener == null) {
throw new OMSRuntimeException(-1,
String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
}
final KeyValue contextProperties = OMS.newKeyValue();
if (batchFlag) {
List<Message> messages = new ArrayList<>(16);
List<Message> messages = new ArrayList<>(32);
for (MessageExt messageExt : rmqMsgList) {
BytesMessageImpl omsMsg = OMSUtil.msgConvert(messageExt);
BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(messageExt);
messages.add(omsMsg);
}
final CountDownLatch sync = new CountDownLatch(1);
......@@ -344,7 +281,7 @@ public class PushConsumerImpl implements Consumer {
public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
......@@ -356,7 +293,7 @@ public class PushConsumerImpl implements Consumer {
} catch (InterruptedException ignore) {
}
} else {
BytesMessageImpl omsMsg = OMSUtil.msgConvert(rmqMsg);
BytesMessageImpl omsMsg = OMSClientUtil.msgConvert(rmqMsg);
final CountDownLatch sync = new CountDownLatch(1);
......@@ -368,7 +305,7 @@ public class PushConsumerImpl implements Consumer {
public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
......
......@@ -17,22 +17,26 @@
package io.openmessaging.rocketmq.domain;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.extension.ExtensionHeader;
import io.openmessaging.message.Header;
import io.openmessaging.message.Message;
import io.openmessaging.OMS;
import java.util.Optional;
import java.util.Arrays;
public class BytesMessageImpl implements Message {
private Header sysHeaders;
private ExtensionHeader extensionHeader;
private MessageReceipt messageReceipt;
private KeyValue userProperties;
private byte[] data;
public BytesMessageImpl() {
this.sysHeaders = new MessageHeader();
this.userProperties = OMS.newKeyValue();
this.extensionHeader = new MessageExtensionHeader();
this.messageReceipt = new DefaultMessageReceipt();
}
@Override
......@@ -41,8 +45,8 @@ public class BytesMessageImpl implements Message {
}
@Override
public Optional<ExtensionHeader> extensionHeader() {
return null;
public ExtensionHeader extensionHeader() {
return extensionHeader;
}
@Override
......@@ -62,6 +66,16 @@ public class BytesMessageImpl implements Message {
@Override
public MessageReceipt getMessageReceipt() {
return null;
return messageReceipt;
}
@Override public String toString() {
return "BytesMessageImpl{" +
"sysHeaders=" + sysHeaders +
", extensionHeader=" + extensionHeader +
", messageReceipt=" + messageReceipt +
", userProperties=" + userProperties +
", data=" + Arrays.toString(data) +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.message.Message;
import io.openmessaging.message.MessageFactory;
public class DefaultMessageFactory implements MessageFactory {
@Override public Message createMessage(String queueName, byte[] body) {
Message message = new BytesMessageImpl();
message.setData(body);
message.header().setDestination(queueName);
return message;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.consumer.MessageReceipt;
import java.util.Objects;
public class DefaultMessageReceipt implements MessageReceipt {
private long offset;
private String messageId;
public DefaultMessageReceipt() {
}
public DefaultMessageReceipt(String messageId, long offset) {
this.messageId = messageId;
this.offset = offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
public long getOffset() {
return offset;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getMessageId() {
return messageId;
}
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
DefaultMessageReceipt receipt = (DefaultMessageReceipt) o;
return offset == receipt.offset &&
Objects.equals(messageId, receipt.messageId);
}
@Override public int hashCode() {
return Objects.hash(offset, messageId);
}
}
......@@ -14,52 +14,59 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.config;
package io.openmessaging.rocketmq.domain;
import io.openmessaging.extension.QueueMetaData;
import java.util.List;
import java.util.Objects;
public class DefaultQueueMetaData implements QueueMetaData {
private String queueName;
private List<QueueMetaData.Partition> partitions;
private int partitionId;
public DefaultQueueMetaData(String queueName, List<QueueMetaData.Partition> partitions) {
public DefaultQueueMetaData(String queueName, int partitionId) {
this.queueName = queueName;
this.partitions = partitions;
this.partitionId = partitionId;
}
@Override
public String queueName() {
return queueName;
@Override public void setQueueName(String queueNaome) {
this.queueName = queueName;
}
@Override
public List<QueueMetaData.Partition> partitions() {
return partitions;
@Override public void setPartitionId(int partitionId) {
this.partitionId = partitionId;
}
public static class DefaultPartition implements Partition {
public DefaultPartition(int partitionId, String partitonHost) {
this.partitionId = partitionId;
this.partitonHost = partitonHost;
}
private int partitionId;
@Override public int partitionId() {
return partitionId;
}
private String partitonHost;
@Override
public String queueName() {
return queueName;
}
@Override
public int partitionId() {
return partitionId;
@Override public boolean equals(Object o) {
if (this == o) {
return true;
}
@Override
public String partitonHost() {
return partitonHost;
if (o == null || getClass() != o.getClass()) {
return false;
}
DefaultQueueMetaData data = (DefaultQueueMetaData) o;
return partitionId == data.partitionId &&
queueName.equals(data.queueName);
}
@Override public int hashCode() {
return Objects.hash(queueName, partitionId);
}
@Override public String toString() {
return "DefaultQueueMetaData{" +
"queueName='" + queueName + '\'' +
", partitionId=" + partitionId +
'}';
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import java.util.Set;
public class MessageExtension implements Extension {
private Extension extension;
public MessageExtension(Extension extension) {
this.extension = extension;
}
@Override public Set<QueueMetaData> getQueueMetaData(String queueName) {
return extension.getQueueMetaData(queueName);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.domain;
import io.openmessaging.extension.ExtensionHeader;
public class MessageExtensionHeader implements ExtensionHeader {
private int partition;
private long offset;
private String correlationId;
private String transactionId;
private long storeTimestamp;
private String storeHost;
private String messageKey;
private String traceId;
private long delayTime;
private long expireTime;
@Override public ExtensionHeader setPartition(int partition) {
this.partition = partition;
return this;
}
@Override public ExtensionHeader setOffset(long offset) {
this.offset = offset;
return this;
}
@Override public ExtensionHeader setCorrelationId(String correlationId) {
this.correlationId = correlationId;
return this;
}
@Override public ExtensionHeader setTransactionId(String transactionId) {
this.transactionId = transactionId;
return this;
}
@Override public ExtensionHeader setStoreTimestamp(long storeTimestamp) {
this.storeTimestamp = storeTimestamp;
return this;
}
@Override public ExtensionHeader setStoreHost(String storeHost) {
this.storeHost = storeHost;
return this;
}
@Override public ExtensionHeader setMessageKey(String messageKey) {
this.messageKey = messageKey;
return this;
}
@Override public ExtensionHeader setTraceId(String traceId) {
this.traceId = traceId;
return this;
}
@Override public ExtensionHeader setDelayTime(long delayTime) {
this.delayTime = delayTime;
return this;
}
@Override public ExtensionHeader setExpireTime(long expireTime) {
this.expireTime = expireTime;
return this;
}
@Override public int getPartiton() {
return partition;
}
@Override public long getOffset() {
return offset;
}
@Override public String getCorrelationId() {
return correlationId;
}
@Override public String getTransactionId() {
return transactionId;
}
@Override public long getStoreTimestamp() {
return storeTimestamp;
}
@Override public String getStoreHost() {
return storeHost;
}
@Override public long getDelayTime() {
return delayTime;
}
@Override public long getExpireTime() {
return expireTime;
}
@Override public String getMessageKey() {
return messageKey;
}
@Override public String getTraceId() {
return traceId;
}
@Override public String toString() {
return "MessageExtensionHeader{" +
"partition=" + partition +
", offset=" + offset +
", correlationId='" + correlationId + '\'' +
", transactionId='" + transactionId + '\'' +
", storeTimestamp=" + storeTimestamp +
", storeHost='" + storeHost + '\'' +
", messageKey='" + messageKey + '\'' +
", traceId='" + traceId + '\'' +
", delayTime=" + delayTime +
", expireTime=" + expireTime +
'}';
}
}
......@@ -110,4 +110,17 @@ public class MessageHeader implements Header {
@Override public short getCompression() {
return this.compression;
}
@Override public String toString() {
return "MessageHeader{" +
"destination='" + destination + '\'' +
", messageId='" + messageId + '\'' +
", bornTimestamp=" + bornTimestamp +
", bornHost='" + bornHost + '\'' +
", priority=" + priority +
", deliveryCount=" + deliveryCount +
", compression=" + compression +
", durability=" + durability +
'}';
}
}
......@@ -30,7 +30,5 @@ public interface NonStandardKeys {
String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID = "CONSUMER_ID";
String TIMEOUT = "TIMEOUT";
String PULL_CONSUMER = "PULL";
String PUSH_CONSUMER = "PUSH";
int PULL_MIN_NUMS = 1;
}
......@@ -38,7 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import static io.openmessaging.rocketmq.utils.OMSUtil.buildInstanceName;
import static io.openmessaging.rocketmq.utils.OMSClientUtil.buildInstanceName;
abstract class AbstractOMSProducer implements ServiceLifecycle, MessageFactory {
final static InternalLogger log = ClientLogger.getLog();
......
......@@ -18,31 +18,38 @@ package io.openmessaging.rocketmq.producer;
import io.openmessaging.Future;
import io.openmessaging.KeyValue;
import io.openmessaging.Promise;
import io.openmessaging.ServiceLifeState;
import io.openmessaging.exception.OMSMessageFormatException;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Message;
import io.openmessaging.Promise;
import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.interceptor.ProducerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.producer.Producer;
import io.openmessaging.producer.SendResult;
import io.openmessaging.producer.TransactionalResult;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.MessageExtension;
import io.openmessaging.rocketmq.promise.DefaultPromise;
import io.openmessaging.rocketmq.utils.OMSUtil;
import io.openmessaging.rocketmq.utils.OMSClientUtil;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.MessageQueue;
import static io.openmessaging.rocketmq.utils.OMSUtil.msgConvert;
import static io.openmessaging.rocketmq.utils.OMSClientUtil.msgConvert;
public class ProducerImpl extends AbstractOMSProducer implements Producer {
private final Extension extension;
public ProducerImpl(final KeyValue properties) {
super(properties);
extension = new MessageExtension(this);
}
@Override
......@@ -60,7 +67,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
throw new OMSRuntimeException(-1, "Send message to RocketMQ broker failed.");
}
message.header().setMessageId(rmqResult.getMsgId());
return OMSUtil.sendResultConvert(rmqResult);
return OMSClientUtil.sendResultConvert(rmqResult);
} catch (Exception e) {
log.error(String.format("Send message to RocketMQ failed, %s", message), e);
throw checkProducerException(rmqMessage.getTopic(), message.header().getMessageId(), e);
......@@ -81,7 +88,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public void onSuccess(final org.apache.rocketmq.client.producer.SendResult rmqResult) {
message.header().setMessageId(rmqResult.getMsgId());
promise.set(OMSUtil.sendResultConvert(rmqResult));
promise.set(OMSClientUtil.sendResultConvert(rmqResult));
}
@Override
......@@ -112,7 +119,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
for (Message message : messages) {
sendOneway(messages);
sendOneway(message);
}
}
......@@ -128,7 +135,7 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
}
for (Message message : messages) {
sendOneway(messages);
sendOneway(message);
}
}
......@@ -152,12 +159,19 @@ public class ProducerImpl extends AbstractOMSProducer implements Producer {
@Override
public Optional<Extension> getExtension() {
return null;
return Optional.of(extension);
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return null;
public Set<QueueMetaData> getQueueMetaData(String queueName) {
List<MessageQueue> messageQueues;
try {
messageQueues = this.rocketmqProducer.fetchPublishMessageQueues(queueName);
} catch (MQClientException e) {
log.error("A error occurred when get queue metadata.", e);
return null;
}
return OMSClientUtil.queueMetaDataConvert(messageQueues);
}
@Override
......
......@@ -18,19 +18,24 @@ package io.openmessaging.rocketmq.utils;
import io.openmessaging.KeyValue;
import io.openmessaging.OMS;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Header;
import io.openmessaging.producer.SendResult;
import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.RocketMQConstants;
import io.openmessaging.rocketmq.domain.SendResultImpl;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageQueue;
public class OMSUtil {
public class OMSClientUtil {
/**
* Builds a OMS client instance name.
......@@ -56,7 +61,6 @@ public class OMSUtil {
rmqMessage.putUserProperty(RocketMQConstants.START_DELIVER_TIME, String.valueOf(deliverTime));
}
for (String key : userHeaders.keySet()) {
MessageAccessor.putProperty(rmqMessage, key, userHeaders.getString(key));
}
......@@ -82,6 +86,13 @@ public class OMSUtil {
omsMsg.header().setBornHost(String.valueOf(rmqMsg.getBornHost()));
omsMsg.header().setBornTimestamp(rmqMsg.getBornTimestamp());
omsMsg.header().setDeliveryCount(rmqMsg.getDelayTimeLevel());
omsMsg.extensionHeader().setPartition(rmqMsg.getQueueId());
omsMsg.extensionHeader().setOffset(rmqMsg.getQueueOffset());
omsMsg.extensionHeader().setDelayTime(rmqMsg.getDelayTimeLevel());
omsMsg.extensionHeader().setMessageKey(rmqMsg.getKeys());
omsMsg.extensionHeader().setStoreHost(rmqMsg.getStoreHost().toString());
omsMsg.extensionHeader().setStoreTimestamp(rmqMsg.getStoreTimestamp());
omsMsg.extensionHeader().setTransactionId(rmqMsg.getTransactionId());
return omsMsg;
}
......@@ -116,4 +127,17 @@ public class OMSUtil {
}
return keyValue;
}
public static Set<QueueMetaData> queueMetaDataConvert(Collection<MessageQueue> messageQueues) {
Set<QueueMetaData> queueMetaDatas = new HashSet<>(32);
if (null != messageQueues && !messageQueues.isEmpty()) {
for (MessageQueue messageQueue : messageQueues) {
QueueMetaData queueMetaData = new DefaultQueueMetaData(messageQueue.getTopic(), messageQueue.getQueueId());
queueMetaDatas.add(queueMetaData);
}
} else {
return null;
}
return queueMetaDatas;
}
}
......@@ -16,10 +16,17 @@
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.internal.DefaultKeyValue;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Before;
......@@ -40,6 +47,8 @@ public class LocalMessageCacheTest {
private DefaultMQPullConsumer rocketmqPullConsume;
@Mock
private ConsumeRequest consumeRequest;
@Mock
private ConsumeRequest consumeRequest1;
@Before
public void init() {
......@@ -86,4 +95,85 @@ public class LocalMessageCacheTest {
localMessageCache.submitConsumeRequest(consumeRequest);
assertThat(localMessageCache.poll()).isEqualTo(consumedMsg);
}
@Test
public void testBatchPollMessage() throws Exception {
byte[] body = new byte[] {'1', '2', '3'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(body);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
byte[] body1 = new byte[] {'4', '5', '6'};
MessageExt consumedMsg1 = new MessageExt();
consumedMsg1.setMsgId("NewMsgId1");
consumedMsg1.setBody(body1);
consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg1.setTopic("HELLO_QUEUE1");
when(consumeRequest.getMessageExt()).thenReturn(consumedMsg);
when(consumeRequest1.getMessageExt()).thenReturn(consumedMsg1);
localMessageCache.submitConsumeRequest(consumeRequest);
localMessageCache.submitConsumeRequest(consumeRequest1);
KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, 3000);
List<MessageExt> messageExts = localMessageCache.batchPoll(properties);
assertThat(messageExts.size()).isEqualTo(2);
MessageExt messageExt1 = null;
MessageExt messageExt2 = null;
for (MessageExt messageExt : messageExts) {
if (messageExt.getMsgId().equals("NewMsgId")) {
messageExt1 = messageExt;
}
if (messageExt.getMsgId().equals("NewMsgId1")) {
messageExt2 = messageExt;
}
}
assertThat(messageExt1).isNotNull();
assertThat(messageExt1.getBody()).isEqualTo(body);
assertThat(messageExt1.getTopic()).isEqualTo("HELLO_QUEUE");
assertThat(messageExt2).isNotNull();
assertThat(messageExt2.getBody()).isEqualTo(body1);
assertThat(messageExt2.getTopic()).isEqualTo("HELLO_QUEUE1");
}
@Test
public void getQueueMetaData() throws MQClientException {
MessageQueue messageQueue1 = new MessageQueue("topic1", "brockerName1", 0);
MessageQueue messageQueue2 = new MessageQueue("topic1", "brockerName2", 1);
MessageQueue messageQueue3 = new MessageQueue("topic1", "brockerName3", 2);
Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
{
add(messageQueue1);
add(messageQueue2);
add(messageQueue3);
}
};
when(rocketmqPullConsume.fetchSubscribeMessageQueues("topic1")).thenReturn(messageQueues);
Set<QueueMetaData> queueMetaDatas = localMessageCache.getQueueMetaData("topic1");
assertThat(queueMetaDatas.size()).isEqualTo(3);
QueueMetaData queueMetaData1 = null;
QueueMetaData queueMetaData2 = null;
QueueMetaData queueMetaData3 = null;
for (QueueMetaData queueMetaData : queueMetaDatas) {
if (queueMetaData.partitionId() == 0) {
queueMetaData1 = queueMetaData;
}
if (queueMetaData.partitionId() == 1) {
queueMetaData2 = queueMetaData;
}
if (queueMetaData.partitionId() == 2) {
queueMetaData3 = queueMetaData;
}
}
assertThat(queueMetaData1).isNotNull();
assertThat(queueMetaData1.queueName()).isEqualTo("topic1");
assertThat(queueMetaData2).isNotNull();
assertThat(queueMetaData2.queueName()).isEqualTo("topic1");
assertThat(queueMetaData3).isNotNull();
assertThat(queueMetaData3.queueName()).isEqualTo("topic1");
}
}
\ No newline at end of file
......@@ -19,14 +19,28 @@ package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.consumer.PullConsumer;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.DefaultMessageReceipt;
import io.openmessaging.rocketmq.domain.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -35,12 +49,15 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
@RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest {
private Consumer consumer;
private PullConsumer pullConsumer;
private String queueName = "HELLO_QUEUE";
@Mock
......@@ -50,15 +67,17 @@ public class PullConsumerImplTest {
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup");
consumer = messagingAccessPoint.createConsumer();
consumer.bindQueue(queueName);
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final KeyValue attributes = messagingAccessPoint.attributes();
attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
pullConsumer = messagingAccessPoint.createPullConsumer();
Set<String> queueNames = new HashSet<>(8);
queueNames.add(queueName);
pullConsumer.bindQueue(queueNames);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true);
field.set(consumer, rocketmqPullConsumer); //Replace
field.set(pullConsumer, rocketmqPullConsumer); //Replace
ClientConfig clientConfig = new ClientConfig();
clientConfig.setOperationTimeout(200);
......@@ -66,27 +85,133 @@ public class PullConsumerImplTest {
field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true);
field.set(consumer, localMessageCache);
consumer.start();
field.set(pullConsumer, localMessageCache);
pullConsumer.start();
}
@Test
public void testPoll() {
final byte[] testBody = new byte[]{'a', 'b'};
public void testPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
consumedMsg.setQueueId(0);
consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class));
Message message = consumer.receive(3 * 1000);
Message message = pullConsumer.receive(3 * 1000);
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(message.getData()).isEqualTo(testBody);
List<MessageExt> messageExts = new ArrayList<MessageExt>() {
{
add(consumedMsg);
}
};
PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
{
add(messageQueue);
}
};
doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
long timeout = 3000L;
Message message1 = pullConsumer.receive(queueName, queueMetaData, messageReceipt, timeout);
assertThat(message1.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(message1.getData()).isEqualTo(testBody);
assertThat(message1.header().getDestination()).isEqualTo(queueName);
assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
}
@Test
public void testBatchPoll() throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
final byte[] testBody = new byte[] {'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName);
consumedMsg.setQueueId(0);
consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
final byte[] testBody1 = new byte[] {'c', 'd'};
MessageExt consumedMsg1 = new MessageExt();
consumedMsg1.setMsgId("NewMsgId1");
consumedMsg1.setBody(testBody1);
consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg1.setTopic(queueName);
consumedMsg1.setQueueId(0);
consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
List<MessageExt> messageExts = new ArrayList<MessageExt>() {
{
add(consumedMsg);
add(consumedMsg1);
}
};
doReturn(messageExts).when(localMessageCache).batchPoll(any(KeyValue.class));
List<Message> messages = pullConsumer.batchReceive(3 * 1000);
Message message1 = null;
Message message2 = null;
assertThat(messages.size()).isEqualTo(2);
for (Message message : messages) {
if (message.header().getMessageId().equals("NewMsgId")) {
message1 = message;
}
if (message.header().getMessageId().equals("NewMsgId1")) {
message2 = message;
}
}
assertThat(message1).isNotNull();
assertThat(message1.getData()).isEqualTo(testBody);
assertThat(message1.header().getDestination()).isEqualTo(queueName);
assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
assertThat(message2).isNotNull();
assertThat(message2.getData()).isEqualTo(testBody1);
assertThat(message2.header().getDestination()).isEqualTo(queueName);
assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
PullResult pullResult = new PullResult(PullStatus.FOUND, 11, 1, 100, messageExts);
doReturn(pullResult).when(rocketmqPullConsumer).pull(any(MessageQueue.class), anyString(), anyLong(), anyInt(), anyLong());
MessageQueue messageQueue = new MessageQueue(queueName, "breakeName", 0);
Set<MessageQueue> messageQueues = new HashSet<MessageQueue>() {
{
add(messageQueue);
}
};
doReturn(messageQueues).when(rocketmqPullConsumer).fetchSubscribeMessageQueues(queueName);
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, 0);
MessageReceipt messageReceipt = new DefaultMessageReceipt("NewMsgId", 10L);
long timeout = 3000L;
List<Message> message1s = pullConsumer.batchReceive(queueName, queueMetaData, messageReceipt, timeout);
assertThat(message1s.size()).isEqualTo(2);
Message message3 = null;
Message message4 = null;
for (Message message : message1s) {
if (message.header().getMessageId().equals("NewMsgId")) {
message3 = message;
}
if (message.header().getMessageId().equals("NewMsgId1")) {
message4 = message;
}
}
assertThat(message3).isNotNull();
assertThat(message3.getData()).isEqualTo(testBody);
assertThat(message3.header().getDestination()).isEqualTo(queueName);
assertThat(message3.extensionHeader().getPartiton()).isEqualTo(0);
assertThat(message4).isNotNull();
assertThat(message4.getData()).isEqualTo(testBody1);
assertThat(message4.header().getDestination()).isEqualTo(queueName);
assertThat(message4.extensionHeader().getPartiton()).isEqualTo(0);
}
@Test
public void testPoll_WithTimeout() {
Message message = consumer.receive(3 * 1000);
Message message = pullConsumer.receive(3 * 1000);
assertThat(message).isNull();
}
}
\ No newline at end of file
......@@ -16,12 +16,21 @@
*/
package io.openmessaging.rocketmq.consumer;
import io.openmessaging.*;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS;
import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.manager.ResourceManager;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
......@@ -31,15 +40,12 @@ import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.lang.reflect.Field;
import java.util.Collections;
import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest {
private Consumer consumer;
private PushConsumer pushConsumer;
@Mock
private DefaultMQPushConsumer rocketmqPushConsumer;
......@@ -48,17 +54,17 @@ public class PushConsumerImplTest {
public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup");
consumer = messagingAccessPoint.createConsumer();
final KeyValue attributes = messagingAccessPoint.attributes();
attributes.put(NonStandardKeys.CONSUMER_ID, "TestGroup");
pushConsumer = messagingAccessPoint.createPushConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true);
DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(consumer);
field.set(consumer, rocketmqPushConsumer); //Replace
DefaultMQPushConsumer innerConsumer = (DefaultMQPushConsumer) field.get(pushConsumer);
field.set(pushConsumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
consumer.start();
pushConsumer.start();
}
@Test
......@@ -70,7 +76,11 @@ public class PushConsumerImplTest {
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumer.bindQueue("HELLO_QUEUE", new MessageListener() {
consumedMsg.setQueueId(0);
consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
Set<String> queueNames = new HashSet<>(8);
queueNames.add("HELLO_QUEUE");
pushConsumer.bindQueue(queueNames, new MessageListener() {
@Override
public void onReceived(Message message, Context context) {
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
......@@ -81,4 +91,65 @@ public class PushConsumerImplTest {
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
}
@Test
public void testBatchConsumeMessage() {
final byte[] testBody = new byte[]{'a', 'b'};
MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE");
consumedMsg.setQueueId(0);
consumedMsg.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
final byte[] testBody1 = new byte[]{'c', 'd'};
MessageExt consumedMsg1 = new MessageExt();
consumedMsg1.setMsgId("NewMsgId1");
consumedMsg1.setBody(testBody1);
consumedMsg1.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg1.setTopic("HELLO_QUEUE");
consumedMsg1.setQueueId(0);
consumedMsg1.setStoreHost(new InetSocketAddress("127.0.0.1", 9876));
List<MessageExt> messageExts = new ArrayList<MessageExt>() {
{
add(consumedMsg);
add(consumedMsg1);
}
};
Set<String> queueNames = new HashSet<>(8);
queueNames.add("HELLO_QUEUE");
pushConsumer.bindQueue(queueNames, new BatchMessageListener() {
@Override public void onReceived(List<Message> batchMessage, Context context) {
assertThat(batchMessage.size()).isEqualTo(2);
Message message1 = null;
Message message2 = null;
for (Message message : batchMessage) {
if (message.header().getMessageId().equals("NewMsgId")) {
message1 = message;
}
if (message.header().getMessageId().equals("NewMsgId1")) {
message2 = message;
}
}
assertThat(message1).isNotNull();
assertThat(message1.getData()).isEqualTo(testBody);
assertThat(message1.header().getDestination()).isEqualTo("HELLO_QUEUE");
assertThat(message1.extensionHeader().getPartiton()).isEqualTo(0);
assertThat(message2).isNotNull();
assertThat(message2.getData()).isEqualTo(testBody1);
assertThat(message2.header().getDestination()).isEqualTo("HELLO_QUEUE");
assertThat(message2.extensionHeader().getPartiton()).isEqualTo(0);
context.ack();
}
});
((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(messageExts, null);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册