From 625ba07728524d778a8faa73867a6bb39cb9976c Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 18 Apr 2017 22:42:00 +0800 Subject: [PATCH] Add PullConsumer related implementation for OpenMessaging. --- .../openmessaging/SimplePullConsumer.java | 56 ++++++++ .../openmessaging/SimplePushConsumer.java | 3 +- .../rocketmq/MessagingAccessPointImpl.java | 4 +- .../io/openmessaging/rocketmq/OMSUtil.java | 50 ++++++- .../rocketmq/consumer/LocalMessageCache.java | 134 ++++++++++++++++++ .../rocketmq/consumer/PullConsumerImpl.java | 115 +++++++++++++-- .../rocketmq/consumer/PushConsumerImpl.java | 8 +- .../rocketmq/domain/ConsumeRequest.java | 55 +++++++ .../rocketmq/domain/NonStandardKeys.java | 2 + 9 files changed, 413 insertions(+), 14 deletions(-) create mode 100644 example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java create mode 100644 openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java new file mode 100644 index 00000000..8dd7b236 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePullConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.example.openmessaging; + +import io.openmessaging.Message; +import io.openmessaging.MessageHeader; +import io.openmessaging.MessagingAccessPoint; +import io.openmessaging.MessagingAccessPointFactory; +import io.openmessaging.OMS; +import io.openmessaging.PullConsumer; +import io.openmessaging.rocketmq.domain.NonStandardKeys; + +public class SimplePullConsumer { + public static void main(String[] args) { + final MessagingAccessPoint messagingAccessPoint = MessagingAccessPointFactory + .getMessagingAccessPoint("openmessaging:rocketmq://10.125.3.140:9876,10.189.232.59:9876/namespace"); + + final PullConsumer consumer = messagingAccessPoint.createPullConsumer("OMS_HELLO_TOPIC", + OMS.newKeyValue().put(NonStandardKeys.CONSUMER_GROUP, "OMS_CONSUMER")); + + messagingAccessPoint.startup(); + System.out.println("messagingAccessPoint startup OK"); + + Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { + @Override + public void run() { + consumer.shutdown(); + messagingAccessPoint.shutdown(); + } + })); + + consumer.startup(); + System.out.println("consumer startup OK"); + + while (true) { + Message message = consumer.poll(); + String msgId = message.headers().getString(MessageHeader.MESSAGE_ID); + System.out.println("Received one message: " + msgId); + consumer.ack(msgId); + } + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java index 6fc8e39d..813e301d 100644 --- a/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/openmessaging/SimplePushConsumer.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.example.openmessaging; import io.openmessaging.Message; +import io.openmessaging.MessageHeader; import io.openmessaging.MessageListener; import io.openmessaging.MessagingAccessPoint; import io.openmessaging.MessagingAccessPointFactory; @@ -47,7 +48,7 @@ public class SimplePushConsumer { consumer.attachQueue("OMS_HELLO_TOPIC", new MessageListener() { @Override public void onMessage(final Message message, final ReceivedMessageContext context) { - System.out.println("Received one message: " + message); + System.out.println("Received one message: " + message.headers().getString(MessageHeader.MESSAGE_ID)); context.ack(); } }); diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java index fecd69f9..af1695b1 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/MessagingAccessPointImpl.java @@ -76,12 +76,12 @@ public class MessagingAccessPointImpl implements MessagingAccessPoint { @Override public PullConsumer createPullConsumer(String queueName) { - return new PullConsumerImpl(accessPointProperties); + return new PullConsumerImpl(queueName, accessPointProperties); } @Override public PullConsumer createPullConsumer(String queueName, KeyValue properties) { - return new PullConsumerImpl(OMSUtil.buildKeyValue(this.accessPointProperties, properties)); + return new PullConsumerImpl(queueName, OMSUtil.buildKeyValue(this.accessPointProperties, properties)); } @Override diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java index dd591a6a..87037ee2 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/OMSUtil.java @@ -25,7 +25,9 @@ import io.openmessaging.rocketmq.domain.BytesMessageImpl; import io.openmessaging.rocketmq.domain.NonStandardKeys; import io.openmessaging.rocketmq.domain.SendResultImpl; import java.lang.reflect.Field; +import java.util.Iterator; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.common.UtilAll; @@ -88,7 +90,8 @@ public class OMSUtil { } omsMsg.putHeaders(MessageHeader.MESSAGE_ID, rmqMsg.getMsgId()); - if (rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { + if (!rmqMsg.getProperties().containsKey(NonStandardKeys.MESSAGE_DESTINATION) || + rmqMsg.getProperties().get(NonStandardKeys.MESSAGE_DESTINATION).equals("TOPIC")) { omsMsg.putHeaders(MessageHeader.TOPIC, rmqMsg.getTopic()); } else { omsMsg.putHeaders(MessageHeader.QUEUE, rmqMsg.getTopic()); @@ -131,4 +134,49 @@ public class OMSUtil { } return keyValue; } + + /** + * Returns an iterator that cycles indefinitely over the elements of {@code Iterable}. + */ + public static Iterator cycle(final Iterable iterable) { + return new Iterator() { + Iterator iterator = new Iterator() { + @Override + public synchronized boolean hasNext() { + return false; + } + + @Override + public synchronized T next() { + throw new NoSuchElementException(); + } + + @Override + public synchronized void remove() { + //Ignore + } + }; + + @Override + public synchronized boolean hasNext() { + return iterator.hasNext() || iterable.iterator().hasNext(); + } + + @Override + public synchronized T next() { + if (!iterator.hasNext()) { + iterator = iterable.iterator(); + if (!iterator.hasNext()) { + throw new NoSuchElementException(); + } + } + return iterator.next(); + } + + @Override + public synchronized void remove() { + iterator.remove(); + } + }; + } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java new file mode 100644 index 00000000..968229a9 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/LocalMessageCache.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.consumer; + +import io.openmessaging.KeyValue; +import io.openmessaging.PropertyKeys; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; + +class LocalMessageCache { + private final BlockingQueue consumeRequestCache; + private final Map consumedRequest; + private final ConcurrentHashMap pullOffsetTable; + private final DefaultMQPullConsumer rocketmqPullConsumer; + private int pullBatchNums = 32; + private int pollTimeout = -1; + private final static Logger log = ClientLogger.getLog(); + + LocalMessageCache(final DefaultMQPullConsumer rocketmqPullConsumer, final KeyValue properties) { + int cacheCapacity = 1000; + if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY)) { + cacheCapacity = properties.getInt(NonStandardKeys.PULL_MESSAGE_CACHE_CAPACITY); + } + consumeRequestCache = new LinkedBlockingQueue<>(cacheCapacity); + + if (properties.containsKey(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS)) { + pullBatchNums = properties.getInt(NonStandardKeys.PULL_MESSAGE_BATCH_NUMS); + } + + if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { + pollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + } + + this.consumedRequest = new ConcurrentHashMap<>(); + this.pullOffsetTable = new ConcurrentHashMap<>(); + this.rocketmqPullConsumer = rocketmqPullConsumer; + } + + int nextPullBatchNums() { + return Math.min(pullBatchNums, consumeRequestCache.remainingCapacity()); + } + + long nextPullOffset(MessageQueue remoteQueue) { + if (!pullOffsetTable.containsKey(remoteQueue)) { + try { + pullOffsetTable.putIfAbsent(remoteQueue, + rocketmqPullConsumer.fetchConsumeOffset(remoteQueue, false)); + } catch (MQClientException e) { + log.error("A error occurred in fetch consume offset process.", e); + } + } + return pullOffsetTable.get(remoteQueue); + } + + void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + pullOffsetTable.put(remoteQueue, nextPullOffset); + } + + void submitConsumeRequest(ConsumeRequest consumeRequest) { + try { + consumeRequestCache.put(consumeRequest); + } catch (InterruptedException ignore) { + } + } + + MessageExt poll() { + try { + ConsumeRequest consumeRequest = consumeRequestCache.take(); + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); + return consumeRequest.getMessageExt(); + } catch (InterruptedException ignore) { + } + return null; + } + + MessageExt poll(final KeyValue properties) { + int currentPollTimeout = pollTimeout; + if (properties.containsKey(PropertyKeys.OPERATION_TIMEOUT)) { + currentPollTimeout = properties.getInt(PropertyKeys.OPERATION_TIMEOUT); + } + + if (currentPollTimeout == -1) { + return poll(); + } + + try { + ConsumeRequest consumeRequest = consumeRequestCache.poll(currentPollTimeout, TimeUnit.MILLISECONDS); + consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); + consumedRequest.put(consumeRequest.getMessageExt().getMsgId(), consumeRequest); + return consumeRequest.getMessageExt(); + } catch (InterruptedException ignore) { + } + return null; + } + + void ack(final String messageId) { + ConsumeRequest consumeRequest = consumedRequest.remove(messageId); + if (consumeRequest != null) { + long offset = consumeRequest.getProcessQueue().removeMessage(Collections.singletonList(consumeRequest.getMessageExt())); + try { + rocketmqPullConsumer.updateConsumeOffset(consumeRequest.getMessageQueue(), offset); + } catch (MQClientException e) { + log.error("A error occurred in update consume offset process.", e); + } + } + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java index 6730b1f3..bd33d781 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PullConsumerImpl.java @@ -18,45 +18,142 @@ package io.openmessaging.rocketmq.consumer; import io.openmessaging.KeyValue; import io.openmessaging.Message; +import io.openmessaging.PropertyKeys; import io.openmessaging.PullConsumer; +import io.openmessaging.exception.OMSRuntimeException; +import io.openmessaging.rocketmq.OMSUtil; +import io.openmessaging.rocketmq.domain.ConsumeRequest; +import io.openmessaging.rocketmq.domain.NonStandardKeys; +import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumer; +import org.apache.rocketmq.client.consumer.MQPullConsumerScheduleService; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullTaskCallback; +import org.apache.rocketmq.client.consumer.PullTaskContext; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.log.ClientLogger; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.slf4j.Logger; public class PullConsumerImpl implements PullConsumer { - public PullConsumerImpl(final KeyValue properties) { + private final DefaultMQPullConsumer rocketmqPullConsumer; + private final KeyValue properties; + private boolean started = false; + private String targetQueueName; + private final MQPullConsumerScheduleService pullConsumerScheduleService; + private final LocalMessageCache localMessageCache; + final static Logger log = ClientLogger.getLog(); + + public PullConsumerImpl(final String queueName, final KeyValue properties) { + this.properties = properties; + this.targetQueueName = queueName; + + String consumerGroup = properties.getString(NonStandardKeys.CONSUMER_GROUP); + if (null == consumerGroup || consumerGroup.isEmpty()) { + throw new OMSRuntimeException("-1", "Consumer Group is necessary for RocketMQ, please set it."); + } + pullConsumerScheduleService = new MQPullConsumerScheduleService(consumerGroup); + + this.rocketmqPullConsumer = pullConsumerScheduleService.getDefaultMQPullConsumer(); + + String accessPoints = properties.getString(PropertyKeys.ACCESS_POINTS); + if (accessPoints == null || accessPoints.isEmpty()) { + throw new OMSRuntimeException("-1", "OMS AccessPoints is null or empty."); + } + this.rocketmqPullConsumer.setNamesrvAddr(accessPoints.replace(',', ';')); + + this.rocketmqPullConsumer.setConsumerGroup(consumerGroup); + + int maxReDeliveryTimes = properties.getInt(NonStandardKeys.MAX_REDELIVERY_TIMES); + if (maxReDeliveryTimes != 0) { + this.rocketmqPullConsumer.setMaxReconsumeTimes(maxReDeliveryTimes); + } + + String consumerId = OMSUtil.buildInstanceName(); + this.rocketmqPullConsumer.setInstanceName(consumerId); + properties.put(PropertyKeys.CONSUMER_ID, consumerId); + + this.localMessageCache = new LocalMessageCache(this.rocketmqPullConsumer, properties); } @Override public KeyValue properties() { - return null; + return properties; } @Override public Message poll() { - return null; + return OMSUtil.msgConvert(localMessageCache.poll()); } @Override public Message poll(final KeyValue properties) { - return null; + return OMSUtil.msgConvert(localMessageCache.poll(properties)); } @Override public void ack(final String messageId) { - + localMessageCache.ack(messageId); } @Override public void ack(final String messageId, final KeyValue properties) { - + localMessageCache.ack(messageId); } @Override - public void startup() { + public synchronized void startup() { + if (!started) { + try { + registerPullTaskCallback(); + this.pullConsumerScheduleService.start(); + } catch (MQClientException e) { + throw new OMSRuntimeException("-1", e); + } + } + this.started = true; + } + + private void registerPullTaskCallback() { + this.pullConsumerScheduleService.registerPullTaskCallback(targetQueueName, new PullTaskCallback() { + @Override + public void doPullTask(final MessageQueue mq, final PullTaskContext context) { + MQPullConsumer consumer = context.getPullConsumer(); + try { + long offset = localMessageCache.nextPullOffset(mq); + PullResult pullResult = consumer.pull(mq, "*", + offset, localMessageCache.nextPullBatchNums()); + ProcessQueue pq = rocketmqPullConsumer.getDefaultMQPullConsumerImpl().getRebalanceImpl() + .getProcessQueueTable().get(mq); + switch (pullResult.getPullStatus()) { + case FOUND: + if (pq != null) { + for (final MessageExt messageExt : pullResult.getMsgFoundList()) { + localMessageCache.submitConsumeRequest(new ConsumeRequest(messageExt, mq, pq)); + } + } + break; + default: + break; + } + localMessageCache.updatePullOffset(mq, pullResult.getNextBeginOffset()); + } catch (Exception e) { + log.error("A error occurred in pull message process.", e); + } + } + }); } @Override - public void shutdown() { - + public synchronized void shutdown() { + if (this.started) { + this.pullConsumerScheduleService.shutdown(); + this.rocketmqPullConsumer.shutdown(); + } + this.started = false; } } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java index cd83212c..9c3b6a9a 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/consumer/PushConsumerImpl.java @@ -156,6 +156,8 @@ public class PushConsumerImpl implements PushConsumer { final KeyValue contextProperties = OMS.newKeyValue(); final CountDownLatch sync = new CountDownLatch(1); + contextProperties.put(NonStandardKeys.MESSAGE_CONSUME_STATUS, ConsumeConcurrentlyStatus.RECONSUME_LATER.name()); + ReceivedMessageContext context = new ReceivedMessageContext() { @Override public KeyValue properties() { @@ -176,9 +178,13 @@ public class PushConsumerImpl implements PushConsumer { properties.getString(NonStandardKeys.MESSAGE_CONSUME_STATUS)); } }; + long begin = System.currentTimeMillis(); listener.onMessage(omsMsg, context); + long costs = System.currentTimeMillis() - begin; + try { - sync.await(PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout(), TimeUnit.MILLISECONDS); + sync.await(Math.max(0, PushConsumerImpl.this.rocketmqPushConsumer.getConsumeTimeout() - costs) + , TimeUnit.MILLISECONDS); } catch (InterruptedException ignore) { } diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java new file mode 100644 index 00000000..7ce4a9b4 --- /dev/null +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/ConsumeRequest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.openmessaging.rocketmq.domain; + +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; + +public class ConsumeRequest { + private final MessageExt messageExt; + private final MessageQueue messageQueue; + private final ProcessQueue processQueue; + private long startConsumeTimeMillis; + + public ConsumeRequest(final MessageExt messageExt, final MessageQueue messageQueue, + final ProcessQueue processQueue) { + this.messageExt = messageExt; + this.messageQueue = messageQueue; + this.processQueue = processQueue; + } + + public MessageExt getMessageExt() { + return messageExt; + } + + public MessageQueue getMessageQueue() { + return messageQueue; + } + + public ProcessQueue getProcessQueue() { + return processQueue; + } + + public long getStartConsumeTimeMillis() { + return startConsumeTimeMillis; + } + + public void setStartConsumeTimeMillis(final long startConsumeTimeMillis) { + this.startConsumeTimeMillis = startConsumeTimeMillis; + } +} diff --git a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java index 566a17d4..3639a3f8 100644 --- a/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java +++ b/openmessaging/src/main/java/io/openmessaging/rocketmq/domain/NonStandardKeys.java @@ -25,4 +25,6 @@ public interface NonStandardKeys { String MIN_CONSUME_THREAD_NUMS = "rmq.min.consume.thread.nums"; String MESSAGE_CONSUME_STATUS = "rmq.message.consume.status"; String MESSAGE_DESTINATION = "rmq.message.destination"; + String PULL_MESSAGE_BATCH_NUMS = "rmq.pull.message.batch.nums"; + String PULL_MESSAGE_CACHE_CAPACITY = "rmq.pull.message.cache.capacity"; } -- GitLab