提交 85af2f06 编写于 作者: M Matteo Merli 提交者: xiaolong.ran

Don't attempt to append on read-only cursor ledger (#5297)

* Don't attempt to append on read-only cursor ledger

* Additionally, ensures the ledger is properly closed

(cherry picked from commit 20b8625d)
上级 63b09e70
......@@ -86,9 +86,9 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
import org.apache.pulsar.common.util.collections.LongPairRangeSet.LongPairConsumer;
import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -128,6 +128,10 @@ public class ManagedCursorImpl implements ManagedCursor {
// Current ledger used to append the mark-delete position
private volatile LedgerHandle cursorLedger;
// Wether the current cursorLedger is read-only or writable
private boolean isCursorLedgerReadOnly = true;
// Stat of the cursor z-node
private volatile Stat cursorLedgerStat;
......@@ -402,6 +406,7 @@ public class ManagedCursorImpl implements ManagedCursor {
lastMarkDeleteEntry = new MarkDeleteEntry(markDeletePosition, properties, null, null);
// assign cursor-ledger so, it can be deleted when new ledger will be switched
this.cursorLedger = recoveredFromCursorLedger;
this.isCursorLedgerReadOnly = true;
STATE_UPDATER.set(this, State.NoLedger);
}
......@@ -1928,7 +1933,7 @@ public class ManagedCursorImpl implements ManagedCursor {
* @param properties
* @param callback
*/
private void persistPosition(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties,
private void persistPositionWhenClosing(PositionImpl position, Map<String, Long> properties,
final AsyncCallbacks.CloseCallback callback, final Object ctx) {
if (shouldPersistUnackRangesToLedger()) {
......@@ -1938,7 +1943,18 @@ public class ManagedCursorImpl implements ManagedCursor {
public void operationComplete() {
log.info("[{}][{}] Updated md-position={} into cursor-ledger {}", ledger.getName(), name,
markDeletePosition, cursorLedger.getId());
callback.closeComplete(ctx);
cursorLedger.asyncClose((rc, lh, ctx1) -> {
callback.closeComplete(ctx);
if (rc == BKException.Code.OK) {
log.info("[{}][{}] Closed cursor-ledger {}", ledger.getName(), name,
cursorLedger.getId());
} else {
log.warn("[{}][{}] Failed to close cursor-ledger {}: {}", ledger.getName(), name,
cursorLedger.getId(), BKException.getMessage(rc));
}
}, ctx);
}
@Override
......@@ -1949,7 +1965,7 @@ public class ManagedCursorImpl implements ManagedCursor {
}
});
} else {
persistPositionMetaStore(cursorsLedgerId, position, properties, new MetaStoreCallback<Void>() {
persistPositionMetaStore(-1, position, properties, new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat stat) {
log.info("[{}][{}] Closed cursor at md-position={}", ledger.getName(), name, markDeletePosition);
......@@ -1969,7 +1985,9 @@ public class ManagedCursorImpl implements ManagedCursor {
}
private boolean shouldPersistUnackRangesToLedger() {
return cursorLedger != null && config.getMaxUnackedRangesToPersist() > 0
return cursorLedger != null
&& !isCursorLedgerReadOnly
&& config.getMaxUnackedRangesToPersist() > 0
&& individualDeletedMessages.size() > config.getMaxUnackedRangesToPersistInZk();
}
......@@ -2023,7 +2041,7 @@ public class ManagedCursorImpl implements ManagedCursor {
callback.closeComplete(ctx);
return;
}
persistPosition(-1, lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
persistPositionWhenClosing(lastMarkDeleteEntry.newPosition, lastMarkDeleteEntry.properties, callback, ctx);
STATE_UPDATER.set(this, State.Closed);
}
......@@ -2299,6 +2317,7 @@ public class ManagedCursorImpl implements ManagedCursor {
name, lh.getId(), markDeletePosition, readPosition);
final LedgerHandle oldLedger = cursorLedger;
cursorLedger = lh;
isCursorLedgerReadOnly = false;
cursorLedgerStat = stat;
// At this point the position had already been safely markdeleted
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册