未验证 提交 c635d08a 编写于 作者: 冉小龙 提交者: GitHub

[Issue:5669] Fix the ledgerID not found cause NPE (#5809)

## Motivation
when ledgers.ceilingKey return a NULL value, we need to process the ledgerId is NUll case.

## Modifications
change long ledgerId to Long ledgerId and check if ledgerId is NULL.
上级 5a9d35ca
......@@ -71,6 +71,10 @@ public class ManagedLedgerException extends Exception {
public ManagedLedgerNotFoundException(Exception e) {
super(e);
}
public ManagedLedgerNotFoundException(String message) {
super(message);
}
}
public static class ManagedLedgerTerminatedException extends ManagedLedgerException {
......
......@@ -1790,8 +1790,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
PositionImpl startReadOperationOnLedger(PositionImpl position) {
long ledgerId = ledgers.ceilingKey(position.getLedgerId());
PositionImpl startReadOperationOnLedger(PositionImpl position, OpReadEntry opReadEntry) {
Long ledgerId = ledgers.ceilingKey(position.getLedgerId());
if (null == ledgerId) {
opReadEntry.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("The ceilingKey(K key) method is used to return the " +
"least key greater than or equal to the given key, or null if there is no such key"), null);
}
if (ledgerId != position.getLedgerId()) {
// The ledger pointed by this position does not exist anymore. It was deleted because it was empty. We need
// to skip on the next available ledger
......
......@@ -49,7 +49,7 @@ class OpReadEntry implements ReadEntriesCallback {
public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef, op);
op.cursor = cursor;
op.count = count;
op.callback = callback;
......@@ -128,12 +128,12 @@ class OpReadEntry implements ReadEntriesCallback {
if (entries.size() < count && cursor.hasMoreEntries()) {
// We still have more entries to read from the next ledger, schedule a new async operation
if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
cursor.ledger.startReadOperationOnLedger(nextReadPosition);
cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
}
// Schedule next read in a different thread
cursor.ledger.getExecutor().execute(safeRun(() -> {
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition);
readPosition = cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
cursor.ledger.asyncReadEntries(OpReadEntry.this);
}));
} else {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册