diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 41aa45b77ce20eded345bb632f4b5163e4dcd9bf..698acfd33e74d63f45e3961b0e268245c8029f27 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -71,6 +71,10 @@ public class ManagedLedgerException extends Exception { public ManagedLedgerNotFoundException(Exception e) { super(e); } + + public ManagedLedgerNotFoundException(String message) { + super(message); + } } public static class ManagedLedgerTerminatedException extends ManagedLedgerException { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 1113ca0bc97a6e28b85950e5157b03015d431d63..3a803864f5ecd2e505318ca7cfd7bee91fb65efc 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1790,8 +1790,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } - PositionImpl startReadOperationOnLedger(PositionImpl position) { - long ledgerId = ledgers.ceilingKey(position.getLedgerId()); + PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) { + Long ledgerId = ledgers.ceilingKey(position.getLedgerId()); + if (null == ledgerId) { + opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " + + "least key greater than or equal to the given key, or null if there is no such key"), null); + } + if (ledgerId != position.getLedgerId()) { // The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need // to skip on the next available ledger diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index dbe5d259d050dac159b23b51b554021b2e860006..c881eb1de5e9b66428d67623b8b783cc8cc01ecf 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -49,7 +49,7 @@ class OpReadEntry implements ReadEntriesCallback { public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count, ReadEntriesCallback callback, Object ctx) { OpReadEntry op = RECYCLER.get(); - op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef); + op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op); op.cursor = cursor; op.count = count; op.callback = callback; @@ -128,12 +128,12 @@ class OpReadEntry implements ReadEntriesCallback { if (entries.size() < count && cursor.hasMoreEntries()) { // We still have more entries to read from the next ledger, schedule a new async operation if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) { - cursor.ledger.startReadOperationOnLedger(nextReadPosition); + cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); } // Schedule next read in a different thread cursor.ledger.getExecutor().execute(safeRun(() -> { - readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition); + readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this); cursor.ledger.asyncReadEntries(OpReadEntry.this); })); } else {