diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 26f1bf3f97d9351dee392c7786c99fbae5b0350d..f70c4a6ba67802e997f29ee066fa728bb20d7394 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -91,6 +91,7 @@ import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.commons.lang3.tuple.Pair; @@ -414,10 +415,34 @@ public class ManagedCursorImpl implements ManagedCursor { lock.writeLock().lock(); try { individualDeletedMessages.clear(); - individualDeletedMessagesList.forEach(messageRange -> individualDeletedMessages - .addOpenClosed(messageRange.getLowerEndpoint().getLedgerId(), - messageRange.getLowerEndpoint().getEntryId(), messageRange.getUpperEndpoint().getLedgerId(), - messageRange.getUpperEndpoint().getEntryId())); + individualDeletedMessagesList.forEach(messageRange -> { + MLDataFormats.NestedPositionInfo lowerEndpoint = messageRange.getLowerEndpoint(); + MLDataFormats.NestedPositionInfo upperEndpoint = messageRange.getUpperEndpoint(); + + if (lowerEndpoint.getLedgerId() == upperEndpoint.getLedgerId()) { + individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(), + upperEndpoint.getLedgerId(), upperEndpoint.getEntryId()); + } else { + // Store message ranges after splitting them by ledger ID + LedgerInfo lowerEndpointLedgerInfo = ledger.getLedgersInfo().get(lowerEndpoint.getLedgerId()); + if (lowerEndpointLedgerInfo != null) { + individualDeletedMessages.addOpenClosed(lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId(), + lowerEndpoint.getLedgerId(), lowerEndpointLedgerInfo.getEntries() - 1); + } else { + log.warn("[{}][{}] No ledger info of lower endpoint {}:{}", ledger.getName(), name, + lowerEndpoint.getLedgerId(), lowerEndpoint.getEntryId()); + } + + for (LedgerInfo li : ledger.getLedgersInfo() + .subMap(lowerEndpoint.getLedgerId(), false, upperEndpoint.getLedgerId(), false).values()) { + individualDeletedMessages.addOpenClosed(li.getLedgerId(), -1, li.getLedgerId(), + li.getEntries() - 1); + } + + individualDeletedMessages.addOpenClosed(upperEndpoint.getLedgerId(), -1, + upperEndpoint.getLedgerId(), upperEndpoint.getEntryId()); + } + }); } finally { lock.writeLock().unlock(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java new file mode 100644 index 0000000000000000000000000000000000000000..a5b921cad76f1a5e001fb027a2a34dd92f964f56 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorIndividualDeletedMessagesTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.testng.Assert.assertEquals; + +import com.google.common.collect.Lists; +import com.google.common.collect.Range; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.NavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.NestedPositionInfo; +import org.apache.pulsar.common.util.collections.LongPairRangeSet; +import org.testng.annotations.Test; + +public class ManagedCursorIndividualDeletedMessagesTest { + @Test(timeOut = 10000) + void testRecoverIndividualDeletedMessages() throws Exception { + BookKeeper bookkeeper = mock(BookKeeper.class); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setUnackedRangesOpenCacheSetEnabled(true); + + NavigableMap ledgersInfo = new ConcurrentSkipListMap<>(); + ledgersInfo.put(1L, createLedgerInfo(1, 100, 1024)); + ledgersInfo.put(3L, createLedgerInfo(3, 50, 512)); + ledgersInfo.put(5L, createLedgerInfo(5, 200, 2048)); + ledgersInfo.put(10L, createLedgerInfo(10, 2, 32)); + ledgersInfo.put(20L, createLedgerInfo(20, 10, 256)); + + ManagedLedgerImpl ledger = mock(ManagedLedgerImpl.class); + doReturn(ledgersInfo).when(ledger).getLedgersInfo(); + + ManagedCursorImpl cursor = spy(new ManagedCursorImpl(bookkeeper, config, ledger, "test-cursor")); + LongPairRangeSet deletedMessages = cursor.getIndividuallyDeletedMessagesSet(); + + Method recoverMethod = ManagedCursorImpl.class.getDeclaredMethod("recoverIndividualDeletedMessages", + List.class); + recoverMethod.setAccessible(true); + + // (1) [(1:5..1:10]] + List messageRangeList = Lists.newArrayList(); + messageRangeList.add(createMessageRange(1, 5, 1, 10)); + List> expectedRangeList = Lists.newArrayList(); + expectedRangeList.add(createPositionRange(1, 5, 1, 10)); + recoverMethod.invoke(cursor, messageRangeList); + assertEquals(deletedMessages.size(), 1); + assertEquals(deletedMessages.asRanges(), expectedRangeList); + + // (2) [(1:10..3:0]] + messageRangeList.clear(); + messageRangeList.add(createMessageRange(1, 10, 3, 0)); + expectedRangeList.clear(); + expectedRangeList.add(createPositionRange(1, 10, 1, 99)); + expectedRangeList.add(createPositionRange(3, -1, 3, 0)); + recoverMethod.invoke(cursor, messageRangeList); + assertEquals(deletedMessages.size(), 2); + assertEquals(deletedMessages.asRanges(), expectedRangeList); + + // (3) [(1:20..10:1],(20:2..20:9]] + messageRangeList.clear(); + messageRangeList.add(createMessageRange(1, 20, 10, 1)); + messageRangeList.add(createMessageRange(20, 2, 20, 9)); + expectedRangeList.clear(); + expectedRangeList.add(createPositionRange(1, 20, 1, 99)); + expectedRangeList.add(createPositionRange(3, -1, 3, 49)); + expectedRangeList.add(createPositionRange(5, -1, 5, 199)); + expectedRangeList.add(createPositionRange(10, -1, 10, 1)); + expectedRangeList.add(createPositionRange(20, 2, 20, 9)); + recoverMethod.invoke(cursor, messageRangeList); + assertEquals(deletedMessages.size(), 5); + assertEquals(deletedMessages.asRanges(), expectedRangeList); + } + + private static LedgerInfo createLedgerInfo(long ledgerId, long entries, long size) { + return LedgerInfo.newBuilder().setLedgerId(ledgerId).setEntries(entries).setSize(size) + .setTimestamp(System.currentTimeMillis()).build(); + } + + private static MessageRange createMessageRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId, + long upperEntryId) { + NestedPositionInfo.Builder nestedPositionBuilder = NestedPositionInfo.newBuilder(); + MessageRange.Builder messageRangeBuilder = MessageRange.newBuilder(); + + nestedPositionBuilder.setLedgerId(lowerLedgerId); + nestedPositionBuilder.setEntryId(lowerEntryId); + messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build()); + + nestedPositionBuilder.setLedgerId(upperLedgerId); + nestedPositionBuilder.setEntryId(upperEntryId); + messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build()); + + return messageRangeBuilder.build(); + } + + private static Range createPositionRange(long lowerLedgerId, long lowerEntryId, long upperLedgerId, + long upperEntryId) { + return Range.openClosed(new PositionImpl(lowerLedgerId, lowerEntryId), + new PositionImpl(upperLedgerId, upperEntryId)); + } +}