未验证 提交 82a6b1e8 编写于 作者: H Heng Du 提交者: GitHub

Merge pull request #1210 from odbozhou/feature_oms_1.0.0

[ISSUE #1199] Implement the 1.0.0 openmessaging consumer API for rocketmq oms module
...@@ -19,12 +19,16 @@ package io.openmessaging.rocketmq; ...@@ -19,12 +19,16 @@ package io.openmessaging.rocketmq;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.consumer.Consumer; import io.openmessaging.consumer.Consumer;
import io.openmessaging.exception.OMSUnsupportException;
import io.openmessaging.manager.ResourceManager; import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.MessageFactory; import io.openmessaging.message.MessageFactory;
import io.openmessaging.producer.Producer; import io.openmessaging.producer.Producer;
import io.openmessaging.producer.TransactionStateCheckListener; import io.openmessaging.producer.TransactionStateCheckListener;
import io.openmessaging.rocketmq.consumer.PullConsumerImpl;
import io.openmessaging.rocketmq.consumer.PushConsumerImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.producer.ProducerImpl; import io.openmessaging.rocketmq.producer.ProducerImpl;
import java.util.HashSet;
import java.util.Set;
public class MessagingAccessPointImpl implements MessagingAccessPoint { public class MessagingAccessPointImpl implements MessagingAccessPoint {
...@@ -54,15 +58,76 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { ...@@ -54,15 +58,76 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint {
} }
@Override public Consumer createConsumer() { @Override public Consumer createConsumer() {
return null; String consumerId = accessPointProperties.getString(NonStandardKeys.CONSUMER_ID);
String[] nsStrArr = consumerId.split("_");
if (nsStrArr.length < 2) {
return new PushConsumerImpl(accessPointProperties);
}
if (NonStandardKeys.PULL_CONSUMER.equals(nsStrArr[0])) {
return new PullConsumerImpl(accessPointProperties);
}
return new PushConsumerImpl(accessPointProperties);
} }
@Override @Override
public ResourceManager resourceManager() { public ResourceManager resourceManager() {
throw new OMSUnsupportException(-1, "ResourceManager is not supported in current version."); DefaultResourceManager resourceManager = new DefaultResourceManager();
return resourceManager;
} }
@Override public MessageFactory messageFactory() { @Override public MessageFactory messageFactory() {
return null; return null;
} }
class DefaultResourceManager implements ResourceManager {
@Override
public void createNamespace(String nsName) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, nsName);
}
@Override
public void deleteNamespace(String nsName) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, null);
}
@Override
public void switchNamespace(String targetNamespace) {
accessPointProperties.put(NonStandardKeys.CONSUMER_ID, targetNamespace);
}
@Override
public Set<String> listNamespaces() {
return new HashSet<String>() {
{
add(accessPointProperties.getString(NonStandardKeys.CONSUMER_ID));
}
};
}
@Override
public void createQueue(String queueName) {
}
@Override
public void deleteQueue(String queueName) {
}
@Override
public Set<String> listQueues(String nsName) {
return null;
}
@Override
public void filter(String queueName, String filterString) {
}
@Override
public void routing(String sourceQueue, String targetQueue) {
}
};
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.openmessaging.rocketmq.config;
import io.openmessaging.extension.QueueMetaData;
import java.util.List;
public class DefaultQueueMetaData implements QueueMetaData {
private String queueName;
private List<QueueMetaData.Partition> partitions;
public DefaultQueueMetaData(String queueName, List<QueueMetaData.Partition> partitions) {
this.queueName = queueName;
this.partitions = partitions;
}
@Override
public String queueName() {
return queueName;
}
@Override
public List<QueueMetaData.Partition> partitions() {
return partitions;
}
public static class DefaultPartition implements Partition {
public DefaultPartition(int partitionId, String partitonHost) {
this.partitionId = partitionId;
this.partitonHost = partitonHost;
}
private int partitionId;
private String partitonHost;
@Override
public int partitionId() {
return partitionId;
}
@Override
public String partitonHost() {
return partitonHost;
}
}
}
...@@ -17,12 +17,18 @@ ...@@ -17,12 +17,18 @@
package io.openmessaging.rocketmq.consumer; package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.ServiceLifeState;
import io.openmessaging.ServiceLifecycle; import io.openmessaging.ServiceLifecycle;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -50,6 +56,7 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -50,6 +56,7 @@ class LocalMessageCache implements ServiceLifecycle {
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
private final ScheduledExecutorService cleanExpireMsgExecutors; private final ScheduledExecutorService cleanExpireMsgExecutors;
private ServiceLifeState currentState;
private final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
...@@ -60,7 +67,8 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -60,7 +67,8 @@ class LocalMessageCache implements ServiceLifecycle {
this.rocketmqPullConsumer = rocketmqPullConsumer; this.rocketmqPullConsumer = rocketmqPullConsumer;
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl( this.cleanExpireMsgExecutors = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"OMS_CleanExpireMsgScheduledThread_")); "OMS_CleanExpireMsgScheduledThread_"));
this.currentState = ServiceLifeState.INITIALIZED;
} }
int nextPullBatchNums() { int nextPullBatchNums() {
...@@ -71,7 +79,7 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -71,7 +79,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!pullOffsetTable.containsKey(remoteQueue)) { if (!pullOffsetTable.containsKey(remoteQueue)) {
try { try {
pullOffsetTable.putIfAbsent(remoteQueue, pullOffsetTable.putIfAbsent(remoteQueue,
rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false));
} catch (MQClientException e) { } catch (MQClientException e) {
log.error("A error occurred in fetch consume offset process.", e); log.error("A error occurred in fetch consume offset process.", e);
} }
...@@ -96,8 +104,8 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -96,8 +104,8 @@ class LocalMessageCache implements ServiceLifecycle {
MessageExt poll(final KeyValue properties) { MessageExt poll(final KeyValue properties) {
int currentPollTimeout = clientConfig.getOperationTimeout(); int currentPollTimeout = clientConfig.getOperationTimeout();
if (properties.containsKey(Message.BuiltinKeys.TIMEOUT)) { if (properties.containsKey(NonStandardKeys.TIMEOUT)) {
currentPollTimeout = properties.getInt(Message.BuiltinKeys.TIMEOUT); currentPollTimeout = properties.getInt(NonStandardKeys.TIMEOUT);
} }
return poll(currentPollTimeout); return poll(currentPollTimeout);
} }
...@@ -117,6 +125,23 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -117,6 +125,23 @@ class LocalMessageCache implements ServiceLifecycle {
return null; return null;
} }
List<MessageExt> batchPoll(final KeyValue properties) {
List<ConsumeRequest> consumeRequests = new ArrayList<>(16);
int n = consumeRequestCache.drainTo(consumeRequests);
if (n > 0) {
List<MessageExt> messageExts = new ArrayList<>(n);
for (ConsumeRequest consumeRequest : consumeRequests) {
MessageExt messageExt = consumeRequest.getMessageExt();
consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis());
MessageAccessor.setConsumeStartTimeStamp(messageExt, String.valueOf(consumeRequest.getStartConsumeTimeMillis()));
consumedRequest.put(messageExt.getMsgId(), consumeRequest);
messageExts.add(messageExt);
}
return messageExts;
}
return null;
}
void ack(final String messageId) { void ack(final String messageId) {
ConsumeRequest consumeRequest = consumedRequest.remove(messageId); ConsumeRequest consumeRequest = consumedRequest.remove(messageId);
if (consumeRequest != null) { if (consumeRequest != null) {
...@@ -139,24 +164,9 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -139,24 +164,9 @@ class LocalMessageCache implements ServiceLifecycle {
} }
} }
@Override
public void startup() {
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
}
@Override
public void shutdown() {
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
}
private void cleanExpireMsg() { private void cleanExpireMsg() {
for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl() for (final Map.Entry<MessageQueue, ProcessQueue> next : rocketmqPullConsumer.getDefaultMQPullConsumerImpl()
.getRebalanceImpl().getProcessQueueTable().entrySet()) { .getRebalanceImpl().getProcessQueueTable().entrySet()) {
ProcessQueue pq = next.getValue(); ProcessQueue pq = next.getValue();
MessageQueue mq = next.getKey(); MessageQueue mq = next.getKey();
ReadWriteLock lockTreeMap = getLockInProcessQueue(pq); ReadWriteLock lockTreeMap = getLockInProcessQueue(pq);
...@@ -176,7 +186,7 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -176,7 +186,7 @@ class LocalMessageCache implements ServiceLifecycle {
if (!msgTreeMap.isEmpty()) { if (!msgTreeMap.isEmpty()) {
msg = msgTreeMap.firstEntry().getValue(); msg = msgTreeMap.firstEntry().getValue();
if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg)) if (System.currentTimeMillis() - Long.parseLong(MessageAccessor.getConsumeStartTimeStamp(msg))
> clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) { > clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000) {
//Expired, ack and remove it. //Expired, ack and remove it.
} else { } else {
break; break;
...@@ -194,7 +204,7 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -194,7 +204,7 @@ class LocalMessageCache implements ServiceLifecycle {
try { try {
rocketmqPullConsumer.sendMessageBack(msg, 3); rocketmqPullConsumer.sendMessageBack(msg, 3);
log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}",
msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset());
ack(mq, pq, msg); ack(mq, pq, msg);
} catch (Exception e) { } catch (Exception e) {
log.error("Send back expired msg exception", e); log.error("Send back expired msg exception", e);
...@@ -210,4 +220,50 @@ class LocalMessageCache implements ServiceLifecycle { ...@@ -210,4 +220,50 @@ class LocalMessageCache implements ServiceLifecycle {
return null; return null;
} }
} }
@Override
public void start() {
this.currentState = ServiceLifeState.STARTING;
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
cleanExpireMsg();
}
}, clientConfig.getRmqMessageConsumeTimeout(), clientConfig.getRmqMessageConsumeTimeout(), TimeUnit.MINUTES);
this.currentState = ServiceLifeState.STARTED;
}
@Override
public void stop() {
this.currentState = ServiceLifeState.STOPPING;
ThreadUtils.shutdownGracefully(cleanExpireMsgExecutors, 5000, TimeUnit.MILLISECONDS);
this.currentState = ServiceLifeState.STARTED;
}
@Override
public ServiceLifeState currentState() {
return currentState;
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
} catch (MQClientException e) {
log.error("A error occurred when get queue metadata.", e);
return null;
}
List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
if (null != messageQueues && !messageQueues.isEmpty()) {
for (MessageQueue messageQueue : messageQueues) {
QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
partitions.add(partition);
}
} else {
return null;
}
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
return queueMetaData;
}
} }
...@@ -17,35 +17,57 @@ ...@@ -17,35 +17,57 @@
package io.openmessaging.rocketmq.consumer; package io.openmessaging.rocketmq.consumer;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.Message; import io.openmessaging.ServiceLifeState;
import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.internal.DefaultKeyValue;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.ConsumeRequest; import io.openmessaging.rocketmq.domain.ConsumeRequest;
import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumer; import org.apache.rocketmq.client.consumer.MQPullConsumer;
import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService;
import org.apache.rocketmq.client.consumer.PullResult; import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.consumer.PullTaskCallback; import org.apache.rocketmq.client.consumer.PullTaskCallback;
import org.apache.rocketmq.client.consumer.PullTaskContext; import org.apache.rocketmq.client.consumer.PullTaskContext;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PullConsumerImpl implements PullConsumer { public class PullConsumerImpl implements Consumer {
private static final int PULL_MAX_NUMS = 32;
private static final int PULL_MIN_NUMS = 1;
private final DefaultMQPullConsumer rocketmqPullConsumer; private final DefaultMQPullConsumer rocketmqPullConsumer;
private final KeyValue properties; private final KeyValue properties;
private boolean started = false; private boolean started = false;
private final MQPullConsumerScheduleService pullConsumerScheduleService; private final MQPullConsumerScheduleService pullConsumerScheduleService;
private final LocalMessageCache localMessageCache; private final LocalMessageCache localMessageCache;
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
private ServiceLifeState currentState;
private List<ConsumerInterceptor> consumerInterceptors;
private final static InternalLogger log = ClientLogger.getLog(); private final static InternalLogger log = ClientLogger.getLog();
...@@ -55,7 +77,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -55,7 +77,7 @@ public class PullConsumerImpl implements PullConsumer {
String consumerGroup = clientConfig.getConsumerId(); String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException(-1, "Consumer Group is necessary for RocketMQ, please set it.");
} }
pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup); pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup);
...@@ -64,7 +86,7 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -64,7 +86,7 @@ public class PullConsumerImpl implements PullConsumer {
if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) { if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
String accessPoints = clientConfig.getAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
} }
this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
} }
...@@ -76,110 +98,290 @@ public class PullConsumerImpl implements PullConsumer { ...@@ -76,110 +98,290 @@ public class PullConsumerImpl implements PullConsumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPullConsumer.setInstanceName(consumerId); this.rocketmqPullConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPullConsumer.setLanguage(LanguageCode.OMS);
this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig); this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, clientConfig);
consumerInterceptors = new ArrayList<>(16);
}
private void registerPullTaskCallback(final String targetQueueName) {
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() {
@Override
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = localMessageCache.nextPullOffset(mq);
PullResult pullResult = consumer.pull(mq, "*",
offset, localMessageCache.nextPullBatchNums());
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl()
.getProcessQueueTable().get(mq);
switch (pullResult.getPullStatus()) {
case FOUND:
if (pq != null) {
pq.putMessage(pullResult.getMsgFoundList());
for (final MessageExt messageExt : pullResult.getMsgFoundList()) {
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
}
}
break;
default:
break;
}
localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("A error occurred in pull message process.", e);
}
}
});
} }
@Override @Override
public KeyValue attributes() { public void resume() {
return properties; currentState = ServiceLifeState.STARTED;
} }
@Override @Override
public PullConsumer attachQueue(String queueName) { public void suspend() {
registerPullTaskCallback(queueName); currentState = ServiceLifeState.STOPPED;
return this;
} }
@Override @Override
public PullConsumer attachQueue(String queueName, KeyValue attributes) { public void suspend(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public boolean isSuspended() {
if (ServiceLifeState.STOPPED.equals(currentState)) {
return true;
}
return false;
}
@Override
public void bindQueue(String queueName) {
registerPullTaskCallback(queueName); registerPullTaskCallback(queueName);
return this;
} }
@Override @Override
public PullConsumer detachQueue(String queueName) { public void bindQueue(List<String> queueNames) {
for (String queueName : queueNames) {
bindQueue(queueName);
}
}
@Override
public void bindQueue(String queueName, MessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueues(List<String> queueNames, MessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueue(String queueName, BatchMessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
throw new UnsupportedOperationException();
}
@Override
public void unbindQueue(String queueName) {
this.rocketmqPullConsumer.getRegisterTopics().remove(queueName); this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
return this;
} }
@Override @Override
public Message receive() { public void unbindQueues(List<String> queueNames) {
MessageExt rmqMsg = localMessageCache.poll(); for (String queueName : queueNames) {
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); this.rocketmqPullConsumer.getRegisterTopics().remove(queueName);
}
}
@Override
public boolean isBindQueue() {
Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
if (null == registerTopics || registerTopics.isEmpty()) {
return false;
}
return true;
}
@Override
public List<String> getBindQueues() {
Set<String> registerTopics = rocketmqPullConsumer.getRegisterTopics();
return new ArrayList<>(registerTopics);
} }
@Override @Override
public Message receive(final KeyValue properties) { public void addInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.add(interceptor);
}
@Override
public void removeInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.remove(interceptor);
}
@Override
public Message receive(long timeout) {
KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, timeout);
MessageExt rmqMsg = localMessageCache.poll(properties); MessageExt rmqMsg = localMessageCache.poll(properties);
return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg); return rmqMsg == null ? null : OMSUtil.msgConvert(rmqMsg);
} }
@Override @Override
public void ack(final String messageId) { public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
localMessageCache.ack(messageId); MessageQueue mq = null;
mq = getQueue(queueName, partitionId, mq);
PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MIN_NUMS);
if (pullResult == null)
return null;
PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages.get(0);
}
}
return null;
}
private PullResult getResult(long receiptId, long timeout, MessageQueue mq, int nums) {
PullResult pullResult;
try {
pullResult = rocketmqPullConsumer.pull(mq, "*", receiptId, nums, timeout);
} catch (MQClientException e) {
log.error("A error occurred when pull message.", e);
return null;
} catch (RemotingException e) {
log.error("A error occurred when pull message.", e);
return null;
} catch (InterruptedException e) {
log.error("A error occurred when pull message.", e);
return null;
} catch (MQBrokerException e) {
log.error("A error occurred when pull message.", e);
return null;
}
if (null == pullResult) {
return null;
}
return pullResult;
}
private MessageQueue getQueue(String queueName, int partitionId, MessageQueue mq) {
try {
Set<MessageQueue> messageQueues = rocketmqPullConsumer.fetchSubscribeMessageQueues(queueName);
for (MessageQueue messageQueue : messageQueues) {
if (messageQueue.getQueueId() == partitionId) {
mq = messageQueue;
}
}
} catch (MQClientException e) {
log.error("A error occurred when batch pull message.", e);
}
return mq;
} }
@Override @Override
public void ack(final String messageId, final KeyValue properties) { public List<Message> batchReceive(long timeout) {
localMessageCache.ack(messageId); KeyValue properties = new DefaultKeyValue();
properties.put(NonStandardKeys.TIMEOUT, timeout);
List<MessageExt> rmqMsgs = localMessageCache.batchPoll(properties);
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
List<Message> messages = new ArrayList<>(rmqMsgs.size());
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
}
return null;
} }
@Override @Override
public synchronized void startup() { public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
if (!started) { MessageQueue mq = null;
try { mq = getQueue(queueName, partitionId, mq);
this.pullConsumerScheduleService.start(); PullResult pullResult = getResult(receiptId, timeout, mq, PULL_MAX_NUMS);
this.localMessageCache.startup(); if (pullResult == null)
} catch (MQClientException e) { return null;
throw new OMSRuntimeException("-1", e); PullStatus pullStatus = pullResult.getPullStatus();
List<Message> messages = new ArrayList<>(16);
if (PullStatus.FOUND.equals(pullStatus)) {
List<MessageExt> rmqMsgs = pullResult.getMsgFoundList();
if (null != rmqMsgs && !rmqMsgs.isEmpty()) {
for (MessageExt messageExt : rmqMsgs) {
BytesMessageImpl bytesMessage = OMSUtil.msgConvert(messageExt);
messages.add(bytesMessage);
}
return messages;
} }
} }
this.started = true; return null;
} }
private void registerPullTaskCallback(final String targetQueueName) { @Override
this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { public void ack(MessageReceipt receipt) {
@Override
public void doPullTask(final MessageQueue mq, final PullTaskContext context) {
MQPullConsumer consumer = context.getPullConsumer();
try {
long offset = localMessageCache.nextPullOffset(mq);
PullResult pullResult = consumer.pull(mq, "*", }
offset, localMessageCache.nextPullBatchNums());
ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() @Override
.getProcessQueueTable().get(mq); public Optional<Extension> getExtension() {
switch (pullResult.getPullStatus()) {
case FOUND: return Optional.of(new Extension() {
if (pq != null) { @Override
pq.putMessage(pullResult.getMsgFoundList()); public QueueMetaData getQueueMetaData(String queueName) {
for (final MessageExt messageExt : pullResult.getMsgFoundList()) { return getQueueMetaData(queueName);
localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq));
}
}
break;
default:
break;
}
localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset());
} catch (Exception e) {
log.error("A error occurred in pull message process.", e);
}
} }
}); });
} }
@Override @Override
public synchronized void shutdown() { public synchronized void start() {
if (!started) {
try {
this.pullConsumerScheduleService.start();
this.localMessageCache.start();
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, e);
}
}
this.started = true;
}
@Override
public synchronized void stop() {
if (this.started) { if (this.started) {
this.localMessageCache.shutdown(); this.localMessageCache.stop();
this.pullConsumerScheduleService.shutdown(); this.pullConsumerScheduleService.shutdown();
this.rocketmqPullConsumer.shutdown(); this.rocketmqPullConsumer.shutdown();
} }
this.started = false; this.started = false;
} }
@Override
public ServiceLifeState currentState() {
return localMessageCache.currentState();
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return localMessageCache.getQueueMetaData(queueName);
}
} }
...@@ -16,20 +16,29 @@ ...@@ -16,20 +16,29 @@
*/ */
package io.openmessaging.rocketmq.consumer; package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage;
import io.openmessaging.KeyValue; import io.openmessaging.KeyValue;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.ServiceLifeState;
import io.openmessaging.consumer.BatchMessageListener;
import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageListener;
import io.openmessaging.consumer.PushConsumer; import io.openmessaging.consumer.MessageReceipt;
import io.openmessaging.exception.OMSRuntimeException; import io.openmessaging.exception.OMSRuntimeException;
import io.openmessaging.extension.Extension;
import io.openmessaging.extension.QueueMetaData;
import io.openmessaging.interceptor.ConsumerInterceptor; import io.openmessaging.interceptor.ConsumerInterceptor;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.config.DefaultQueueMetaData;
import io.openmessaging.rocketmq.domain.BytesMessageImpl;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import io.openmessaging.rocketmq.utils.BeanUtils; import io.openmessaging.rocketmq.utils.BeanUtils;
import io.openmessaging.rocketmq.utils.OMSUtil; import io.openmessaging.rocketmq.utils.OMSUtil;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -38,15 +47,24 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; ...@@ -38,15 +47,24 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.apache.rocketmq.remoting.protocol.LanguageCode;
public class PushConsumerImpl implements PushConsumer { public class PushConsumerImpl implements Consumer {
private final static InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumer rocketmqPushConsumer; private final DefaultMQPushConsumer rocketmqPushConsumer;
private final KeyValue properties; private final KeyValue properties;
private boolean started = false; private boolean started = false;
private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>(); private final Map<String, MessageListener> subscribeTable = new ConcurrentHashMap<>();
private final Map<String, BatchMessageListener> batchSubscribeTable = new ConcurrentHashMap<>();
private final ClientConfig clientConfig; private final ClientConfig clientConfig;
private ServiceLifeState currentState;
private List<ConsumerInterceptor> consumerInterceptors;
public PushConsumerImpl(final KeyValue properties) { public PushConsumerImpl(final KeyValue properties) {
this.rocketmqPushConsumer = new DefaultMQPushConsumer(); this.rocketmqPushConsumer = new DefaultMQPushConsumer();
...@@ -56,14 +74,14 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -56,14 +74,14 @@ public class PushConsumerImpl implements PushConsumer {
if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) { if ("true".equalsIgnoreCase(System.getenv("OMS_RMQ_DIRECT_NAME_SRV"))) {
String accessPoints = clientConfig.getAccessPoints(); String accessPoints = clientConfig.getAccessPoints();
if (accessPoints == null || accessPoints.isEmpty()) { if (accessPoints == null || accessPoints.isEmpty()) {
throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); throw new OMSRuntimeException(-1, "OMS AccessPoints is null or empty.");
} }
this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); this.rocketmqPushConsumer.setNamesrvAddr(accessPoints.replace(',', ';'));
} }
String consumerGroup = clientConfig.getConsumerId(); String consumerGroup = clientConfig.getConsumerId();
if (null == consumerGroup || consumerGroup.isEmpty()) { if (null == consumerGroup || consumerGroup.isEmpty()) {
throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); throw new OMSRuntimeException(-1, "Consumer Group is necessary for RocketMQ, please set it.");
} }
this.rocketmqPushConsumer.setConsumerGroup(consumerGroup); this.rocketmqPushConsumer.setConsumerGroup(consumerGroup);
this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes()); this.rocketmqPushConsumer.setMaxReconsumeTimes(clientConfig.getRmqMaxRedeliveryTimes());
...@@ -73,15 +91,13 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -73,15 +91,13 @@ public class PushConsumerImpl implements PushConsumer {
String consumerId = OMSUtil.buildInstanceName(); String consumerId = OMSUtil.buildInstanceName();
this.rocketmqPushConsumer.setInstanceName(consumerId); this.rocketmqPushConsumer.setInstanceName(consumerId);
properties.put(OMSBuiltinKeys.CONSUMER_ID, consumerId); properties.put(NonStandardKeys.CONSUMER_ID, consumerId);
this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS); this.rocketmqPushConsumer.setLanguage(LanguageCode.OMS);
this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl()); this.rocketmqPushConsumer.registerMessageListener(new MessageListenerImpl());
}
@Override consumerInterceptors = new ArrayList<>(16);
public KeyValue attributes() { currentState = ServiceLifeState.INITIALIZED;
return properties;
} }
@Override @Override
...@@ -96,7 +112,7 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -96,7 +112,7 @@ public class PushConsumerImpl implements PushConsumer {
@Override @Override
public void suspend(long timeout) { public void suspend(long timeout) {
throw new UnsupportedOperationException();
} }
@Override @Override
...@@ -105,102 +121,264 @@ public class PushConsumerImpl implements PushConsumer { ...@@ -105,102 +121,264 @@ public class PushConsumerImpl implements PushConsumer {
} }
@Override @Override
public PushConsumer attachQueue(final String queueName, final MessageListener listener) { public void bindQueue(String queueName) {
try {
rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
@Override
public void bindQueue(List<String> queueNames) {
for (String queueName : queueNames) {
bindQueue(queueName);
}
}
@Override
public void bindQueue(String queueName, MessageListener listener) {
this.subscribeTable.put(queueName, listener); this.subscribeTable.put(queueName, listener);
this.batchSubscribeTable.remove(queueName);
try {
this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) {
throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
}
}
@Override
public void bindQueues(List<String> queueNames, MessageListener listener) {
for (String queueName : queueNames) {
bindQueue(queueName, listener);
}
}
@Override
public void bindQueue(String queueName, BatchMessageListener listener) {
this.batchSubscribeTable.put(queueName, listener);
this.subscribeTable.remove(queueName);
try { try {
this.rocketmqPushConsumer.subscribe(queueName, "*"); this.rocketmqPushConsumer.subscribe(queueName, "*");
} catch (MQClientException e) { } catch (MQClientException e) {
throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer can't attach to %s.", queueName)); throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer can't attach to %s.", queueName));
} }
return this;
} }
@Override @Override
public PushConsumer attachQueue(String queueName, MessageListener listener, KeyValue attributes) { public void bindQueues(List<String> queueNames, BatchMessageListener listener) {
return this.attachQueue(queueName, listener); for (String queueName : queueNames) {
bindQueue(queueName, listener);
}
} }
@Override @Override
public PushConsumer detachQueue(String queueName) { public void unbindQueue(String queueName) {
this.subscribeTable.remove(queueName); this.subscribeTable.remove(queueName);
this.batchSubscribeTable.remove(queueName);
try { try {
this.rocketmqPushConsumer.unsubscribe(queueName); this.rocketmqPushConsumer.unsubscribe(queueName);
} catch (Exception e) { } catch (Exception e) {
throw new OMSRuntimeException("-1", String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName)); throw new OMSRuntimeException(-1, String.format("RocketMQ push consumer fails to unsubscribe topic: %s", queueName));
}
}
@Override
public void unbindQueues(List<String> queueNames) {
for (String queueName : queueNames) {
unbindQueue(queueName);
}
}
@Override
public boolean isBindQueue() {
Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
if (null != subscription && subscription.size() > 0) {
return true;
}
return false;
}
@Override
public List<String> getBindQueues() {
Map<String, String> subscription = rocketmqPushConsumer.getSubscription();
if (null != subscription && subscription.size() > 0) {
return new ArrayList<>(subscription.keySet());
} }
return null; return null;
} }
@Override @Override
public void addInterceptor(ConsumerInterceptor interceptor) { public void addInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.add(interceptor);
} }
@Override @Override
public void removeInterceptor(ConsumerInterceptor interceptor) { public void removeInterceptor(ConsumerInterceptor interceptor) {
consumerInterceptors.remove(interceptor);
}
@Override
public Message receive(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public Message receive(String queueName, int partitionId, long receiptId, long timeout) {
throw new UnsupportedOperationException();
} }
@Override @Override
public synchronized void startup() { public List<Message> batchReceive(long timeout) {
throw new UnsupportedOperationException();
}
@Override
public List<Message> batchReceive(String queueName, int partitionId, long receiptId, long timeout) {
throw new UnsupportedOperationException();
}
@Override
public void ack(MessageReceipt receipt) {
throw new UnsupportedOperationException();
}
@Override
public Optional<Extension> getExtension() {
return Optional.of(new Extension() {
@Override
public QueueMetaData getQueueMetaData(String queueName) {
return getQueueMetaData(queueName);
}
});
}
@Override
public synchronized void start() {
currentState = ServiceLifeState.STARTING;
if (!started) { if (!started) {
try { try {
this.rocketmqPushConsumer.start(); this.rocketmqPushConsumer.start();
} catch (MQClientException e) { } catch (MQClientException e) {
throw new OMSRuntimeException("-1", e); throw new OMSRuntimeException(-1, e);
} }
} }
this.started = true; this.started = true;
currentState = ServiceLifeState.STARTED;
} }
@Override @Override
public synchronized void shutdown() { public synchronized void stop() {
currentState = ServiceLifeState.STOPPING;
if (this.started) { if (this.started) {
this.rocketmqPushConsumer.shutdown(); this.rocketmqPushConsumer.shutdown();
} }
this.started = false; this.started = false;
currentState = ServiceLifeState.STOPPED;
}
@Override
public ServiceLifeState currentState() {
return currentState;
}
@Override
public QueueMetaData getQueueMetaData(String queueName) {
Set<MessageQueue> messageQueues;
try {
messageQueues = rocketmqPushConsumer.fetchSubscribeMessageQueues(queueName);
} catch (MQClientException e) {
log.error("A error occurred when get queue metadata.", e);
return null;
}
List<QueueMetaData.Partition> partitions = new ArrayList<>(16);
if (null != messageQueues && !messageQueues.isEmpty()) {
for (MessageQueue messageQueue : messageQueues) {
QueueMetaData.Partition partition = new DefaultQueueMetaData.DefaultPartition(messageQueue.getQueueId(), messageQueue.getBrokerName());
partitions.add(partition);
}
} else {
return null;
}
QueueMetaData queueMetaData = new DefaultQueueMetaData(queueName, partitions);
return queueMetaData;
} }
class MessageListenerImpl implements MessageListenerConcurrently { class MessageListenerImpl implements MessageListenerConcurrently {
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList, public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> rmqMsgList,
ConsumeConcurrentlyContext contextRMQ) { ConsumeConcurrentlyContext contextRMQ) {
boolean batchFlag = true;
MessageExt rmqMsg = rmqMsgList.get(0); MessageExt rmqMsg = rmqMsgList.get(0);
BytesMessage omsMsg = OMSUtil.msgConvert(rmqMsg); BatchMessageListener batchMessageListener = PushConsumerImpl.this.batchSubscribeTable.get(rmqMsg.getTopic());
MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic()); MessageListener listener = PushConsumerImpl.this.subscribeTable.get(rmqMsg.getTopic());
if (null == batchMessageListener) {
if (listener == null) { batchFlag = false;
throw new OMSRuntimeException("-1", }
String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic())); if (listener == null && batchMessageListener == null) {
throw new OMSRuntimeException(-1,
String.format("The topic/queue %s isn't attached to this consumer", rmqMsg.getTopic()));
} }
final KeyValue contextProperties = OMS.newKeyValue(); final KeyValue contextProperties = OMS.newKeyValue();
final CountDownLatch sync = new CountDownLatch(1);
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); if (batchFlag) {
List<Message> messages = new ArrayList<>(16);
for (MessageExt messageExt : rmqMsgList) {
BytesMessageImpl omsMsg = OMSUtil.msgConvert(messageExt);
messages.add(omsMsg);
}
final CountDownLatch sync = new CountDownLatch(1);
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
BatchMessageListener.Context context = new BatchMessageListener.Context() {
@Override
public void success(MessageReceipt... messages) {
MessageListener.Context context = new MessageListener.Context() { }
@Override
public KeyValue attributes() { @Override
return contextProperties; public void ack() {
sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
batchMessageListener.onReceived(messages, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
} }
} else {
BytesMessageImpl omsMsg = OMSUtil.msgConvert(rmqMsg);
final CountDownLatch sync = new CountDownLatch(1);
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name());
@Override MessageListener.Context context = new MessageListener.Context() {
public void ack() {
sync.countDown(); @Override
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, public void ack() {
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name()); sync.countDown();
contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS,
ConsumeConcurrentlyStatus.CONSUME_SUCCESS.name());
}
};
long begin = System.currentTimeMillis();
listener.onReceived(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
} }
};
long begin = System.currentTimeMillis();
listener.onReceived(omsMsg, context);
long costs = System.currentTimeMillis() - begin;
long timeoutMills = clientConfig.getRmqMessageConsumeTimeout() * 60 * 1000;
try {
sync.await(Math.max(0, timeoutMills - costs), TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
} }
return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); return ConsumeConcurrentlyStatus.valueOf(contextProperties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS));
......
...@@ -18,7 +18,7 @@ package io.openmessaging.rocketmq.domain; ...@@ -18,7 +18,7 @@ package io.openmessaging.rocketmq.domain;
import io.openmessaging.message.Header; import io.openmessaging.message.Header;
public class MessageHeader implements Header{ public class MessageHeader implements Header {
private String destination; private String destination;
......
...@@ -28,5 +28,9 @@ public interface NonStandardKeys { ...@@ -28,5 +28,9 @@ public interface NonStandardKeys {
String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums";
String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity";
String PRODUCER_ID = "PRODUCER_ID"; String PRODUCER_ID = "PRODUCER_ID";
String CONSUMER_ID ="CONSUMER_ID"; String CONSUMER_ID = "CONSUMER_ID";
String TIMEOUT = "TIMEOUT";
String PULL_CONSUMER = "PULL";
String PUSH_CONSUMER = "PUSH";
} }
...@@ -16,12 +16,12 @@ ...@@ -16,12 +16,12 @@
*/ */
package io.openmessaging.rocketmq.consumer; package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage; import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPoint;
import io.openmessaging.OMS; import io.openmessaging.OMS;
import io.openmessaging.OMSBuiltinKeys; import io.openmessaging.consumer.Consumer;
import io.openmessaging.consumer.PullConsumer; import io.openmessaging.manager.ResourceManager;
import io.openmessaging.message.Message;
import io.openmessaging.rocketmq.config.ClientConfig; import io.openmessaging.rocketmq.config.ClientConfig;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field; import java.lang.reflect.Field;
...@@ -34,12 +34,13 @@ import org.mockito.Mock; ...@@ -34,12 +34,13 @@ import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class PullConsumerImplTest { public class PullConsumerImplTest {
private PullConsumer consumer; private Consumer consumer;
private String queueName = "HELLO_QUEUE"; private String queueName = "HELLO_QUEUE";
@Mock @Mock
...@@ -49,10 +50,11 @@ public class PullConsumerImplTest { ...@@ -49,10 +50,11 @@ public class PullConsumerImplTest {
@Before @Before
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
consumer = messagingAccessPoint.createPullConsumer(OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); resourceManager.createNamespace(NonStandardKeys.PULL_CONSUMER +"_TestGroup");
consumer.attachQueue(queueName); consumer = messagingAccessPoint.createConsumer();
consumer.bindQueue(queueName);
Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer"); Field field = PullConsumerImpl.class.getDeclaredField("rocketmqPullConsumer");
field.setAccessible(true); field.setAccessible(true);
...@@ -65,34 +67,26 @@ public class PullConsumerImplTest { ...@@ -65,34 +67,26 @@ public class PullConsumerImplTest {
field = PullConsumerImpl.class.getDeclaredField("localMessageCache"); field = PullConsumerImpl.class.getDeclaredField("localMessageCache");
field.setAccessible(true); field.setAccessible(true);
field.set(consumer, localMessageCache); field.set(consumer, localMessageCache);
consumer.start();
messagingAccessPoint.startup();
consumer.startup();
} }
@Test @Test
public void testPoll() { public void testPoll() {
final byte[] testBody = new byte[] {'a', 'b'}; final byte[] testBody = new byte[]{'a', 'b'};
MessageExt consumedMsg = new MessageExt(); MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId"); consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody); consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic(queueName); consumedMsg.setTopic(queueName);
doReturn(consumedMsg).when(localMessageCache).poll(any(KeyValue.class));
when(localMessageCache.poll()).thenReturn(consumedMsg); Message message = consumer.receive(3 * 1000);
assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
Message message = consumer.receive(); assertThat(message.getData()).isEqualTo(testBody);
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody);
} }
@Test @Test
public void testPoll_WithTimeout() { public void testPoll_WithTimeout() {
//There is a default timeout value, @see ClientConfig#omsOperationTimeout. Message message = consumer.receive(3 * 1000);
Message message = consumer.receive();
assertThat(message).isNull();
message = consumer.receive(OMS.newKeyValue().put(Message.BuiltinKeys.TIMEOUT, 100));
assertThat(message).isNull(); assertThat(message).isNull();
} }
} }
\ No newline at end of file
...@@ -16,16 +16,12 @@ ...@@ -16,16 +16,12 @@
*/ */
package io.openmessaging.rocketmq.consumer; package io.openmessaging.rocketmq.consumer;
import io.openmessaging.BytesMessage; import io.openmessaging.*;
import io.openmessaging.Message; import io.openmessaging.consumer.Consumer;
import io.openmessaging.OMSBuiltinKeys;
import io.openmessaging.consumer.MessageListener; import io.openmessaging.consumer.MessageListener;
import io.openmessaging.MessagingAccessPoint; import io.openmessaging.manager.ResourceManager;
import io.openmessaging.OMS; import io.openmessaging.message.Message;
import io.openmessaging.consumer.PushConsumer;
import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.NonStandardKeys;
import java.lang.reflect.Field;
import java.util.Collections;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
...@@ -35,12 +31,15 @@ import org.junit.runner.RunWith; ...@@ -35,12 +31,15 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner; import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat; import java.lang.reflect.Field;
import java.util.Collections;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class PushConsumerImplTest { public class PushConsumerImplTest {
private PushConsumer consumer; private Consumer consumer;
@Mock @Mock
private DefaultMQPushConsumer rocketmqPushConsumer; private DefaultMQPushConsumer rocketmqPushConsumer;
...@@ -48,9 +47,10 @@ public class PushConsumerImplTest { ...@@ -48,9 +47,10 @@ public class PushConsumerImplTest {
@Before @Before
public void init() throws NoSuchFieldException, IllegalAccessException { public void init() throws NoSuchFieldException, IllegalAccessException {
final MessagingAccessPoint messagingAccessPoint = OMS final MessagingAccessPoint messagingAccessPoint = OMS
.getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace"); .getMessagingAccessPoint("oms:rocketmq://IP1:9876,IP2:9876/namespace");
consumer = messagingAccessPoint.createPushConsumer( final ResourceManager resourceManager = messagingAccessPoint.resourceManager();
OMS.newKeyValue().put(OMSBuiltinKeys.CONSUMER_ID, "TestGroup")); resourceManager.createNamespace(NonStandardKeys.PUSH_CONSUMER + "_TestGroup");
consumer = messagingAccessPoint.createConsumer();
Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer"); Field field = PushConsumerImpl.class.getDeclaredField("rocketmqPushConsumer");
field.setAccessible(true); field.setAccessible(true);
...@@ -58,28 +58,27 @@ public class PushConsumerImplTest { ...@@ -58,28 +58,27 @@ public class PushConsumerImplTest {
field.set(consumer, rocketmqPushConsumer); //Replace field.set(consumer, rocketmqPushConsumer); //Replace
when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener()); when(rocketmqPushConsumer.getMessageListener()).thenReturn(innerConsumer.getMessageListener());
messagingAccessPoint.startup(); consumer.start();
consumer.startup();
} }
@Test @Test
public void testConsumeMessage() { public void testConsumeMessage() {
final byte[] testBody = new byte[] {'a', 'b'}; final byte[] testBody = new byte[]{'a', 'b'};
MessageExt consumedMsg = new MessageExt(); MessageExt consumedMsg = new MessageExt();
consumedMsg.setMsgId("NewMsgId"); consumedMsg.setMsgId("NewMsgId");
consumedMsg.setBody(testBody); consumedMsg.setBody(testBody);
consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC"); consumedMsg.putUserProperty(NonStandardKeys.MESSAGE_DESTINATION, "TOPIC");
consumedMsg.setTopic("HELLO_QUEUE"); consumedMsg.setTopic("HELLO_QUEUE");
consumer.attachQueue("HELLO_QUEUE", new MessageListener() { consumer.bindQueue("HELLO_QUEUE", new MessageListener() {
@Override @Override
public void onReceived(Message message, Context context) { public void onReceived(Message message, Context context) {
assertThat(message.sysHeaders().getString(Message.BuiltinKeys.MESSAGE_ID)).isEqualTo("NewMsgId"); assertThat(message.header().getMessageId()).isEqualTo("NewMsgId");
assertThat(((BytesMessage) message).getBody(byte[].class)).isEqualTo(testBody); assertThat(message.getData()).isEqualTo(testBody);
context.ack(); context.ack();
} }
}); });
((MessageListenerConcurrently) rocketmqPushConsumer ((MessageListenerConcurrently) rocketmqPushConsumer
.getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null); .getMessageListener()).consumeMessage(Collections.singletonList(consumedMsg), null);
} }
} }
\ No newline at end of file
...@@ -580,7 +580,7 @@ ...@@ -580,7 +580,7 @@
<dependency> <dependency>
<groupId>io.openmessaging</groupId> <groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId> <artifactId>openmessaging-api</artifactId>
<version>0.3.1-alpha</version> <version>1.0.0-beta-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册