From 62ca947a457769469189e6e0acd990a833b70ab8 Mon Sep 17 00:00:00 2001 From: King <794220751@qq.com> Date: Fri, 19 Jul 2019 11:19:07 +0800 Subject: [PATCH] Polish LitePullConsumer (#1332) * fix unsubscribe code * fix commit consumed offset * fix commit consumed offset * fix commit consumed offset * fix commit consumed offset * polish commit consumed offset * pass checkstyle * pass checkstyle --- .../consumer/DefaultLiteMQPullConsumer.java | 6 +- .../consumer/DefaultMQPullConsumer.java | 3 +- .../client/consumer/MQPullConsumer.java | 2 - .../impl/consumer/AssignedMessageQueue.java | 10 --- .../impl/consumer/LiteMQPullConsumerImpl.java | 65 ++++++++++++------- .../client/impl/consumer/ProcessQueue.java | 12 ++++ .../example/simple/LitePullConsumerTest.java | 14 ++-- 7 files changed, 64 insertions(+), 48 deletions(-) diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java index 96d4f5ab..6f67bcf8 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLiteMQPullConsumer.java @@ -42,7 +42,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements /** * Maximum commit offset interval time in seconds. */ - private long autoCommitInterval = 20; + private long autoCommitInterval = 5; public DefaultLiteMQPullConsumer(String consumerGroup, RPCHook rpcHook) { this.setConsumerGroup(consumerGroup); @@ -55,7 +55,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements } @Override - public void start() throws MQClientException{ + public void start() throws MQClientException { this.liteMQPullConsumer.start(); } @@ -95,7 +95,7 @@ public class DefaultLiteMQPullConsumer extends DefaultMQPullConsumer implements @Override public void commitSync() { - this.liteMQPullConsumer.commit(); + this.liteMQPullConsumer.commitSync(); } public long getConsumeTimeout() { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java index dbf37d2a..3fa3af24 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java @@ -16,9 +16,7 @@ */ package org.apache.rocketmq.client.consumer; -import java.util.Collection; import java.util.HashSet; -import java.util.List; import java.util.Set; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.QueryResult; @@ -40,6 +38,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; * Default pulling consumer */ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsumer { + protected final transient DefaultMQPullConsumerImpl defaultMQPullConsumerImpl; /** diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java index 9c7cb363..a8e96283 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/MQPullConsumer.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.client.consumer; -import java.util.Collection; -import java.util.List; import java.util.Set; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java index e9623a81..fb0ca79b 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.Collection; -import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -107,15 +106,6 @@ public class AssignedMessageQueue { } } - public Map getNeedCommitOffsets() { - Map map = new HashMap(); - Set> entries = this.assignedMessageQueueState.entrySet(); - for (Map.Entry entry : entries) { - map.put(entry.getKey(), entry.getValue().getNextOffset()); - } - return map; - } - public class MessageQueueStat { private MessageQueue messageQueue; private boolean paused = false; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java index d6122862..ab229e44 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/LiteMQPullConsumerImpl.java @@ -16,9 +16,9 @@ */ package org.apache.rocketmq.client.impl.consumer; -import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -33,6 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; + import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.rocketmq.client.consumer.DefaultLiteMQPullConsumer; import org.apache.rocketmq.client.consumer.MessageQueueListener; @@ -50,6 +51,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.RPCHook; public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { + private final InternalLogger log = ClientLogger.getLog(); private DefaultLiteMQPullConsumer defaultLiteMQPullConsumer; @@ -59,7 +61,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { private AssignedMessageQueue assignedMessageQueue = new AssignedMessageQueue(); - private List allConsumed = new ArrayList(256); + private volatile Set consumedSet = new HashSet(); private final BlockingQueue consumeRequestCache = new LinkedBlockingQueue(); @@ -69,6 +71,8 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { private ScheduledExecutorService autoCommitExecutors; + private final ThreadLocal preConsumeRequestLocal = new ThreadLocal(); + public LiteMQPullConsumerImpl(final DefaultLiteMQPullConsumer defaultMQPullConsumer, final RPCHook rpcHook) { super(defaultMQPullConsumer, rpcHook); this.defaultLiteMQPullConsumer = defaultMQPullConsumer; @@ -145,7 +149,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { @Override public void run() { if (defaultLiteMQPullConsumer.isAutoCommit()) { - commit(); + commitAll(); } } }, this.defaultLiteMQPullConsumer.getAutoCommitInterval(), this.defaultLiteMQPullConsumer.getAutoCommitInterval(), TimeUnit.SECONDS); @@ -164,7 +168,9 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { public List poll(long timeout) { try { - ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.SECONDS); + addToConsumed(preConsumeRequestLocal.get()); + ConsumeRequest consumeRequest = consumeRequestCache.poll(timeout, TimeUnit.MILLISECONDS); + preConsumeRequestLocal.set(consumeRequest); if (consumeRequest != null) { List messages = consumeRequest.getMessageExts(); for (MessageExt messageExt : messages) { @@ -173,7 +179,8 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { consumeRequest.setStartConsumeTimeMillis(System.currentTimeMillis()); return messages; } - } catch (InterruptedException ignore) { + } catch (InterruptedException e) { + log.error("poll ComsumeRequest error.", e); } return null; } @@ -197,7 +204,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { } public void unsubscribe(final String topic) { - unsubscribe(topic); + super.unsubscribe(topic); removePullTaskCallback(topic); assignedMessageQueue.removeAssignedMessageQueue(topic); } @@ -212,30 +219,39 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { while (it.hasNext()) { Map.Entry next = it.next(); if (next.getKey().getTopic().equals(topic)) { + next.getValue().setCancelled(true); it.remove(); } } } } - public void commit() { - List consumeRequests; - synchronized (this.allConsumed) { - consumeRequests = this.allConsumed; - this.allConsumed = new ArrayList(); + public void commitSync() { + addToConsumed(preConsumeRequestLocal.get()); + preConsumeRequestLocal.set(null); + commitAll(); + } + + public void commitAll() { + Set consumedRequests; + synchronized (this.consumedSet) { + consumedRequests = this.consumedSet; + this.consumedSet = new HashSet(); } - for (ConsumeRequest consumeRequest : consumeRequests) { + for (ConsumeRequest consumeRequest : consumedRequests) { consumeRequest.getProcessQueue().removeMessage(consumeRequest.messageExts); } - Set> entrySet = assignedMessageQueue.getNeedCommitOffsets().entrySet(); - for (Map.Entry entry : entrySet) { + Set> entrySet = this.rebalanceImpl.getProcessQueueTable().entrySet(); + for (Map.Entry entry : entrySet) { try { - updateConsumeOffset(entry.getKey(), entry.getValue()); + long consumeOffset = entry.getValue().getConsumeOffset(); + if (consumeOffset != -1) + updateConsumeOffset(entry.getKey(), consumeOffset); } catch (MQClientException e) { log.error("A error occurred in update consume offset process.", e); } } - this.getOffsetStore().persistAll(assignedMessageQueue.getNeedCommitOffsets().keySet()); + this.getOffsetStore().persistAll(this.rebalanceImpl.getProcessQueueTable().keySet()); } private void commit(final MessageQueue messageQueue, final ProcessQueue processQueue, final MessageExt messageExt) { @@ -260,7 +276,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { } } - void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { + private void updatePullOffset(MessageQueue remoteQueue, long nextPullOffset) { try { assignedMessageQueue.updateNextOffset(remoteQueue, nextPullOffset); } catch (MQClientException e) { @@ -269,21 +285,23 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { } private void addToConsumed(ConsumeRequest consumeRequest) { - synchronized (this.allConsumed) { - allConsumed.add(consumeRequest); + if (consumeRequest != null) { + synchronized (this.consumedSet) { + if (!consumedSet.contains(consumeRequest)) + consumedSet.add(consumeRequest); + } } } - void submitConsumeRequest(ConsumeRequest consumeRequest) { + private void submitConsumeRequest(ConsumeRequest consumeRequest) { try { consumeRequestCache.put(consumeRequest); - addToConsumed(consumeRequest); } catch (InterruptedException ex) { log.error("Submit consumeRequest error", ex); } } - long nextPullOffset(MessageQueue remoteQueue) { + private long nextPullOffset(MessageQueue remoteQueue) { long offset = -1; try { offset = assignedMessageQueue.getNextOffset(remoteQueue); @@ -337,7 +355,7 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { this.defaultMQPullConsumer.sendMessageBack(msg, 3); log.info("Send expired msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); - System.out.println("Send expired msg back."); + log.info("Send expired msg back."); commit(mq, pq, msg); } catch (Exception e) { log.error("Send back expired msg exception", e); @@ -364,7 +382,6 @@ public class LiteMQPullConsumerImpl extends DefaultMQPullConsumerImpl { @Override public void run() { - System.out.println("begin pull message"); String topic = this.messageQueue.getTopic(); if (!this.isCancelled()) { if (assignedMessageQueue.isPaused(messageQueue)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 0a52817c..e9a1c72d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -26,6 +26,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.log.ClientLogger; import org.apache.rocketmq.logging.InternalLogger; @@ -431,4 +432,15 @@ public class ProcessQueue { public void setLastConsumeTimestamp(long lastConsumeTimestamp) { this.lastConsumeTimestamp = lastConsumeTimestamp; } + + public long getConsumeOffset() { + + if (msgTreeMap.isEmpty() && queueOffsetMax == 0L) + return -1; + + if (!msgTreeMap.isEmpty()) + return msgTreeMap.firstKey(); + else + return queueOffsetMax + 1; + } } diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java index 4297e4f1..215763bd 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/LitePullConsumerTest.java @@ -25,24 +25,24 @@ import org.apache.rocketmq.common.message.MessageQueue; public class LitePullConsumerTest { public static void main(String[] args) throws Exception { DefaultLiteMQPullConsumer litePullConsumer = new DefaultLiteMQPullConsumer("test", null); - litePullConsumer.subscribe("test", null); + litePullConsumer.setNamesrvAddr("localhost:9876"); + litePullConsumer.subscribe("litepullconsumertest9", null); litePullConsumer.start(); - MessageQueue messageQueue = new MessageQueue("test", "duhengdeMacBook-Pro.local", 1); + MessageQueue messageQueue = new MessageQueue("test", "IT-C02YW28FLVDL.local", 1); int i = 0; while (true) { List messageExts = litePullConsumer.poll(); - System.out.println("-----------"); - System.out.println(messageExts); - System.out.println("-----------"); + System.out.printf("%s%n", messageExts); i++; if (i == 3) { - System.out.println("pause"); + System.out.printf("pause%n"); litePullConsumer.pause(Arrays.asList(messageQueue)); } if (i == 10) { - System.out.println("resume"); + System.out.printf("resume%n"); litePullConsumer.resume(Arrays.asList(messageQueue)); } +// litePullConsumer.commitSync(); } } -- GitLab