提交 d1cb591d 编写于 作者: M Matteo Merli 提交者: GitHub

Fixed race condition in cursor.asyncDelete() (#287)

上级 6d4bd9d2
......@@ -1173,58 +1173,54 @@ public class ManagedCursorImpl implements ManagedCursor {
* @return the previous acknowledged position
*/
PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
lock.writeLock().lock();
try {
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}
if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
if (newMarkDeletePosition.compareTo(markDeletePosition) < 0) {
throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
}
if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}
if (readPosition.compareTo(newMarkDeletePosition) <= 0) {
// If the position that is mark-deleted is past the read position, it
// means that the client has skipped some entries. We need to move
// read position forward
PositionImpl oldReadPosition = readPosition;
readPosition = ledger.getNextValidPosition(newMarkDeletePosition);
oldReadPosition.recycle();
if (log.isDebugEnabled()) {
log.debug("Moved read position from: {} to: {}", oldReadPosition, readPosition);
}
PositionImpl oldMarkDeletePosition = markDeletePosition;
oldReadPosition.recycle();
}
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}
PositionImpl oldMarkDeletePosition = markDeletePosition;
if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
long skippedEntries = 0;
if (newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId()
&& newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1) {
// Mark-deleting the position next to current one
skippedEntries = individualDeletedMessages.contains(newMarkDeletePosition) ? 0 : 1;
} else {
skippedEntries = getNumberOfEntries(Range.openClosed(oldMarkDeletePosition, newMarkDeletePosition));
}
PositionImpl positionAfterNewMarkDelete = ledger.getNextValidPosition(newMarkDeletePosition);
if (individualDeletedMessages.contains(positionAfterNewMarkDelete)) {
Range<PositionImpl> rangeToBeMarkDeleted = individualDeletedMessages
.rangeContaining(positionAfterNewMarkDelete);
newMarkDeletePosition = rangeToBeMarkDeleted.upperEndpoint();
}
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();
} finally {
lock.writeLock().unlock();
if (log.isDebugEnabled()) {
log.debug("Moved ack position from: {} to: {} -- skipped: {}", oldMarkDeletePosition,
newMarkDeletePosition, skippedEntries);
}
messagesConsumedCounter += skippedEntries;
}
// markDelete-position and clear out deletedMsgSet
markDeletePosition = PositionImpl.get(newMarkDeletePosition);
individualDeletedMessages.remove(Range.atMost(markDeletePosition));
oldMarkDeletePosition.recycle();
return newMarkDeletePosition;
}
......@@ -1254,11 +1250,14 @@ public class ManagedCursorImpl implements ManagedCursor {
}
PositionImpl newPosition = (PositionImpl) position;
lock.writeLock().lock();
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
}
// Apply rate limiting to mark-delete operations
......@@ -1487,6 +1486,12 @@ public class ManagedCursorImpl implements ManagedCursor {
newMarkDeletePosition = range.upperEndpoint();
}
}
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
......@@ -1496,19 +1501,13 @@ public class ManagedCursorImpl implements ManagedCursor {
lock.writeLock().unlock();
}
try {
if (newMarkDeletePosition != null) {
newMarkDeletePosition = setAcknowledgedPosition(newMarkDeletePosition);
} else {
newMarkDeletePosition = markDeletePosition;
}
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}
// Apply rate limiting to mark-delete operations
if (markDeleteLimiter != null && !markDeleteLimiter.tryAcquire()) {
callback.deleteComplete(ctx);
return;
}
try {
internalAsyncMarkDelete(newMarkDeletePosition, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册