diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index 4176520abd5c59b8d1ed1bfc549e166e7a00741e..0cea1aea890f70613cb136918ba448f10e2cc1b4 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -48,7 +48,10 @@ public class ProcessQueue { private final AtomicLong msgCount = new AtomicLong(); private final AtomicLong msgSize = new AtomicLong(); private final Lock lockConsume = new ReentrantLock(); - private final TreeMap msgTreeMapTemp = new TreeMap(); + /** + * A subset of msgTreeMap, will only be used when orderly consume + */ + private final TreeMap consumingMsgOrderlyTreeMap = new TreeMap(); private final AtomicLong tryUnlockTimes = new AtomicLong(0); private volatile long queueOffsetMax = 0L; private volatile boolean dropped = false; @@ -243,8 +246,8 @@ public class ProcessQueue { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { - this.msgTreeMap.putAll(this.msgTreeMapTemp); - this.msgTreeMapTemp.clear(); + this.msgTreeMap.putAll(this.consumingMsgOrderlyTreeMap); + this.consumingMsgOrderlyTreeMap.clear(); } finally { this.lockTreeMap.writeLock().unlock(); } @@ -257,12 +260,12 @@ public class ProcessQueue { try { this.lockTreeMap.writeLock().lockInterruptibly(); try { - Long offset = this.msgTreeMapTemp.lastKey(); - msgCount.addAndGet(0 - this.msgTreeMapTemp.size()); - for (MessageExt msg : this.msgTreeMapTemp.values()) { + Long offset = this.consumingMsgOrderlyTreeMap.lastKey(); + msgCount.addAndGet(0 - this.consumingMsgOrderlyTreeMap.size()); + for (MessageExt msg : this.consumingMsgOrderlyTreeMap.values()) { msgSize.addAndGet(0 - msg.getBody().length); } - this.msgTreeMapTemp.clear(); + this.consumingMsgOrderlyTreeMap.clear(); if (offset != null) { return offset + 1; } @@ -281,7 +284,7 @@ public class ProcessQueue { this.lockTreeMap.writeLock().lockInterruptibly(); try { for (MessageExt msg : msgs) { - this.msgTreeMapTemp.remove(msg.getQueueOffset()); + this.consumingMsgOrderlyTreeMap.remove(msg.getQueueOffset()); this.msgTreeMap.put(msg.getQueueOffset(), msg); } } finally { @@ -304,7 +307,7 @@ public class ProcessQueue { Map.Entry entry = this.msgTreeMap.pollFirstEntry(); if (entry != null) { result.add(entry.getValue()); - msgTreeMapTemp.put(entry.getKey(), entry.getValue()); + consumingMsgOrderlyTreeMap.put(entry.getKey(), entry.getValue()); } else { break; } @@ -343,7 +346,7 @@ public class ProcessQueue { this.lockTreeMap.writeLock().lockInterruptibly(); try { this.msgTreeMap.clear(); - this.msgTreeMapTemp.clear(); + this.consumingMsgOrderlyTreeMap.clear(); this.msgCount.set(0); this.msgSize.set(0); this.queueOffsetMax = 0L; @@ -402,10 +405,10 @@ public class ProcessQueue { info.setCachedMsgSizeInMiB((int) (this.msgSize.get() / (1024 * 1024))); } - if (!this.msgTreeMapTemp.isEmpty()) { - info.setTransactionMsgMinOffset(this.msgTreeMapTemp.firstKey()); - info.setTransactionMsgMaxOffset(this.msgTreeMapTemp.lastKey()); - info.setTransactionMsgCount(this.msgTreeMapTemp.size()); + if (!this.consumingMsgOrderlyTreeMap.isEmpty()) { + info.setTransactionMsgMinOffset(this.consumingMsgOrderlyTreeMap.firstKey()); + info.setTransactionMsgMaxOffset(this.consumingMsgOrderlyTreeMap.lastKey()); + info.setTransactionMsgCount(this.consumingMsgOrderlyTreeMap.size()); } info.setLocked(this.locked); diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 35b905e18d21609f6ade4d6c0c10fafc34f9180f..7c169796741f9f80e4b42dd883956a80f6194e27 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements MQProducerInner { String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); - MessageQueue tmpmq = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); - if (tmpmq != null) { - mq = tmpmq; + MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); + if (mqSelected != null) { + mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis();