From 63464ab7724e525676ed1d7d35f244452f5bcec2 Mon Sep 17 00:00:00 2001 From: liudezhi <33149602+liudezhi2098@users.noreply.github.com> Date: Wed, 26 Feb 2020 19:26:15 +0800 Subject: [PATCH] Consumer received duplicated deplayed messages upon restart Fix when send a delayed message ,there is a case when a consumer restarts and pull duplicate messages. #6403 (cherry picked from commit e71b9fc4e256f24c9b6c0edd14e40e8af1f24374) --- .../bookkeeper/mledger/impl/ManagedCursorImpl.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index cb6f40a38bf..a6f884502ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1116,7 +1116,14 @@ public class ManagedCursorImpl implements ManagedCursor { }; positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)) - .forEach(p -> ledger.asyncReadEntry((PositionImpl) p, cb, ctx)); + .forEach(p ->{ + if (((PositionImpl) p).compareTo(this.readPosition) == 0) { + this.setReadPosition(this.readPosition.getNext()); + log.warn("[{}][{}] replayPosition{} equals readPosition{}," + " need set next readPositio", + ledger.getName(), name, (PositionImpl) p, this.readPosition); + } + ledger.asyncReadEntry((PositionImpl) p, cb, ctx); + }); return alreadyAcknowledgedPositions; } -- GitLab