diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java index e21dbc8f18a0e0e3905a282d510ce856da6d35cd..4176520abd5c59b8d1ed1bfc549e166e7a00741e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.client.impl.consumer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -102,7 +103,7 @@ public class ProcessQueue { try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { - msgTreeMap.remove(msgTreeMap.firstKey()); + removeMessage(Collections.singletonList(msg)); } catch (Exception e) { log.error("send expired msg exception", e); }