diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 4f629c5441e201d8f4008d880e294600c7ab0943..bedb81d112ea76906241df45cb7b040cd222da40 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -444,4 +444,9 @@ public interface ManagedLedger { * @return the last confirmed entry id */ Position getLastConfirmedEntry(); + + /** + * Signaling managed ledger that we can resume after BK write failure + */ + void readyToCreateNewLedger(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9bd98a2919c3d91d0fe3bcf8bb4d00c0309810d6..a2842c51dd98ae4612f6b60b057ba0e2cabfbec1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static java.lang.Math.min; -import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import com.google.common.collect.BoundType; @@ -59,7 +58,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; @@ -186,9 +184,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private static final Random random = new Random(System.currentTimeMillis()); private long maximumRolloverTimeMs; - // Time period in which new write requests will not be accepted, after we fail in creating a new ledger. - final static long WaitTimeAfterLedgerCreationFailureMs = 10000; - volatile PositionImpl lastConfirmedEntry; protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; @@ -208,6 +203,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { // the new instance will take over Terminated, // Managed ledger was terminated and no more entries // are allowed to be added. Reads are allowed + WriteFailed // The state that is transitioned to when a BK write failure happens + // After handling the BK write failure, managed ledger will get signalled to create a new ledger } // define boundaries for position based seeks and searches @@ -570,6 +567,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } else if (state == State.Closed) { addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed")); return; + } else if (state == State.WriteFailed) { + pendingAddEntries.remove(addOperation); + addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure")); + return; } if (state == State.ClosingLedger || state == State.CreatingLedger) { @@ -579,14 +580,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.debug("[{}] Queue addEntry request", name); } } else if (state == State.ClosedLedger) { - long now = clock.millis(); - if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) { - // Deny the write request, since we haven't waited enough time since last attempt to create a new ledger - pendingAddEntries.remove(addOperation); - addOperation.failed(new ManagedLedgerException("Waiting for new ledger creation to complete")); - return; - } - // No ledger and no pending operations. Create a new ledger if (log.isDebugEnabled()) { log.debug("[{}] Creating a new ledger", name); @@ -623,6 +616,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { } } + @Override + public void readyToCreateNewLedger() { + // only set transition state to ClosedLedger if current state is WriteFailed + if (STATE_UPDATER.compareAndSet(this, State.WriteFailed, State.ClosedLedger)){ + log.info("[{}] Managed ledger is now ready to accept writes again", name); + } + } + @Override public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException { return openCursor(cursorName, InitialPosition.Latest); @@ -1196,10 +1197,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc)); ManagedLedgerException status = createManagedLedgerException(rc); + // no pending entries means that creating this new ledger is NOT caused by write failure + if (pendingAddEntries.isEmpty()) { + STATE_UPDATER.set(this, State.ClosedLedger); + } else { + STATE_UPDATER.set(this, State.WriteFailed); + } + // Empty the list of pending requests and make all of them fail clearPendingAddEntries(status); lastLedgerCreationFailureTimestamp = clock.millis(); - STATE_UPDATER.set(this, State.ClosedLedger); } else { log.info("[{}] Created new ledger {}", name, lh.getId()); ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build()); @@ -1227,10 +1234,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { if (e instanceof BadVersionException) { synchronized (ManagedLedgerImpl.this) { log.error( - "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger", + "[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger", name); STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced); - clearPendingAddEntries(e); + // Return ManagedLedgerFencedException to addFailed callback + // to indicate that the ledger is now fenced and topic needs to be closed + clearPendingAddEntries(new ManagedLedgerFencedException(e)); + // Do not need to unlock ledgersListMutex here because we are going to close to topic anyways return; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index e741d44f903d4a476e8aa37322d15a634a1cf0e8..e8824581d14d55780ef173c7da5aa072654d86ce 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -101,8 +101,8 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { public void failed(ManagedLedgerException e) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); - data.release(); if (cb != null) { + data.release(); cb.addFailed(e, ctx); ml.mbean.recordAddEntryError(); } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java index d9ad5c59b056c38bd7975e660d2b9ba4ea556a15..3d47a6f5697d7e742eef94560a3f35e0f25c6557 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerErrorsTest.java @@ -329,40 +329,6 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { latch.await(); } - @Test - public void recoverAfterWriteError() throws Exception { - ManagedLedger ledger = factory.open("my_test_ledger"); - ManagedCursor cursor = ledger.openCursor("c1"); - - bkc.failNow(BKException.Code.BookieHandleNotAvailableException); - - // With one single error, the write should succeed - ledger.addEntry("entry".getBytes()); - - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); - - bkc.failNow(BKException.Code.BookieHandleNotAvailableException); - zkc.failNow(Code.CONNECTIONLOSS); - try { - ledger.addEntry("entry".getBytes()); - fail("should fail"); - } catch (ManagedLedgerException e) { - // ok - } - - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); - - // Next add will fail as well - try { - ledger.addEntry("entry".getBytes()); - fail("should fail"); - } catch (ManagedLedgerException e) { - // ok - } - - assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); - } - @Test public void recoverAfterZnodeVersionError() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); @@ -376,7 +342,8 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { // This write will try to create a ledger and it will fail at it ledger.addEntry("entry".getBytes()); fail("should fail"); - } catch (BadVersionException e) { + } catch (ManagedLedgerFencedException e) { + assertEquals(e.getCause().getCause().getClass(), org.apache.zookeeper.KeeperException.BadVersionException.class); // ok } @@ -390,7 +357,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { } @Test - public void recoverLongTimeAfterWriteError() throws Exception { + public void recoverAfterWriteError() throws Exception { ManagedLedger ledger = factory.open("my_test_ledger"); ManagedCursor cursor = ledger.openCursor("c1"); @@ -410,7 +377,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { // ok } - Thread.sleep(ManagedLedgerImpl.WaitTimeAfterLedgerCreationFailureMs / 2); + bkc.failNow(BKException.Code.NotEnoughBookiesException); try { ledger.addEntry("entry-3".getBytes()); fail("should fail"); @@ -418,8 +385,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase { // ok } - // After some time, the managed ledger will be available for writes again - Thread.sleep(ManagedLedgerImpl.WaitTimeAfterLedgerCreationFailureMs / 2 + 10); + assertEquals(cursor.getNumberOfEntriesInBacklog(), 1); + + // Signal that ManagedLedger has recovered from write error and will be availbe for writes again + ledger.readyToCreateNewLedger(); // Next add should succeed, and the previous write should not appear ledger.addEntry("entry-4".getBytes()); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 9f6d96a10e8112e0d9ffa0d0e4a417a3bd9d8fd8..d94099bc68069f2637c6a96777615942b2dfbedc 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1418,6 +1418,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase { assertEquals(ledger.getLedgersInfoAsList().size(), 0); // Next write should fail as well + bkc.failNow(BKException.Code.NoBookieAvailableException); try { ledger.addEntry("entry".getBytes()); fail("Should have received exception"); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 1d3bf43d32b85f049df03b8220764e9871c5e55c..2e6b103a0f3afb506ae753a91234118760138a6e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -321,6 +321,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal if (isFenced) { messageDeduplication.resetHighestSequenceIdPushed(); log.info("[{}] Un-fencing topic...", topic); + // signal to managed ledger that we are ready to resume by creating a new ledger + ledger.readyToCreateNewLedger(); + isFenced = false; } @@ -342,15 +345,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal @Override public synchronized void addFailed(ManagedLedgerException exception, Object ctx) { - - // fence topic when failed to write a message to BK - isFenced = true; - if (exception instanceof ManagedLedgerFencedException) { // If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen close(); } else { + // fence topic when failed to write a message to BK + isFenced = true; // close all producers List> futures = Lists.newArrayList(); producers.forEach(producer -> futures.add(producer.disconnect())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java index 55020c636ba186504b51e4ae85d5836a4b4c324e..4c57690b7c0d7eb7f51bd620c4abb68763c8898f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ClientDeduplicationFailureTest.java @@ -31,9 +31,9 @@ import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.BatchMessageIdImpl; import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; import org.apache.pulsar.common.policies.data.TopicStats; -import org.apache.pulsar.io.PulsarFunctionE2ETest; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,10 +46,13 @@ import java.net.URL; import java.util.LinkedList; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import static org.mockito.Mockito.spy; @@ -73,7 +76,7 @@ public class ClientDeduplicationFailureTest { private final int brokerWebServicePort = PortManager.nextFreePort(); private final int brokerServicePort = PortManager.nextFreePort(); - private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class); + private static final Logger log = LoggerFactory.getLogger(ClientDeduplicationFailureTest.class); @BeforeMethod(timeOut = 300000) void setup(Method method) throws Exception { @@ -94,6 +97,9 @@ public class ClientDeduplicationFailureTest { config.setTlsAllowInsecureConnection(true); config.setAdvertisedAddress("localhost"); config.setLoadBalancerSheddingEnabled(false); + config.setLoadBalancerAutoBundleSplitEnabled(false); + config.setLoadBalancerEnabled(false); + config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false); config.setAllowAutoTopicCreationType("non-partitioned"); @@ -127,7 +133,141 @@ public class ClientDeduplicationFailureTest { bkEnsemble.stop(); } - @Test + private static class ProducerThread implements Runnable { + + private volatile boolean isRunning = false; + private Thread thread; + private Producer producer; + private long i = 1; + private AtomicLong atomicLong = new AtomicLong(0); + private CompletableFuture lastMessageFuture; + + public ProducerThread(Producer producer) { + this.thread = new Thread(this); + this.producer = producer; + } + + @Override + public void run() { + while(isRunning) { + lastMessageFuture = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync(); + lastMessageFuture.thenAccept(messageId -> { + atomicLong.incrementAndGet(); + + }).exceptionally(ex -> { + log.info("publish exception:", ex); + return null; + }); + i++; + } + log.info("done Producing! Last send: {}", i); + } + + public void start() { + this.isRunning = true; + this.thread.start(); + } + + public void stop() { + this.isRunning = false; + try { + log.info("Waiting for last message to complete"); + try { + this.lastMessageFuture.get(60, TimeUnit.SECONDS); + } catch (TimeoutException e) { + throw new RuntimeException("Last message hasn't completed within timeout!"); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + log.info("Producer Thread stopped!"); + } + + public long getLastSeqId() { + return this.atomicLong.get(); + } + } + + @Test(timeOut = 300000) + public void testClientDeduplicationCorrectnessWithFailure() throws Exception { + final String namespacePortion = "dedup"; + final String replNamespace = tenant + "/" + namespacePortion; + final String sourceTopic = "persistent://" + replNamespace + "/my-topic1"; + admin.namespaces().createNamespace(replNamespace); + Set clusters = Sets.newHashSet(Lists.newArrayList("use")); + admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters); + admin.namespaces().setDeduplicationStatus(replNamespace, true); + admin.namespaces().setRetention(replNamespace, new RetentionPolicies(-1, -1)); + Producer producer = pulsarClient.newProducer(Schema.STRING) + .blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS) + .topic(sourceTopic) + .producerName("test-producer-1") + .create(); + + + ProducerThread producerThread = new ProducerThread(producer); + producerThread.start(); + + retryStrategically((test) -> { + try { + TopicStats topicStats = admin.topics().getStats(sourceTopic); + return topicStats.publishers.size() == 1 && topicStats.publishers.get(0).getProducerName().equals("test-producer-1") && topicStats.storageSize > 0; + } catch (PulsarAdminException e) { + return false; + } + }, 5, 200); + + TopicStats topicStats = admin.topics().getStats(sourceTopic); + assertEquals(topicStats.publishers.size(), 1); + assertEquals(topicStats.publishers.get(0).getProducerName(), "test-producer-1"); + assertTrue(topicStats.storageSize > 0); + + for (int i = 0; i < 5; i++) { + log.info("Stopping BK..."); + bkEnsemble.stopBK(); + + Thread.sleep(1000 + new Random().nextInt(500)); + + log.info("Starting BK..."); + bkEnsemble.startBK(); + } + + producerThread.stop(); + + // send last message + producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send(); + producer.close(); + + Reader reader = pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic(sourceTopic).create(); + Message prevMessage = null; + Message message = null; + int count = 0; + while(true) { + message = reader.readNext(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + + if (message.getValue().equals("end")) { + log.info("Last seq Id received: {}", prevMessage.getSequenceId()); + break; + } + if (prevMessage == null) { + assertEquals(message.getSequenceId(), 1); + } else { + assertEquals(message.getSequenceId(), prevMessage.getSequenceId() + 1); + } + prevMessage = message; + count++; + } + + log.info("# of messages read: {}", count); + + assertTrue(prevMessage != null); + assertEquals(prevMessage.getSequenceId(), producerThread.getLastSeqId()); + } + + @Test(timeOut = 300000) public void testClientDeduplicationWithBkFailure() throws Exception { final String namespacePortion = "dedup"; final String replNamespace = tenant + "/" + namespacePortion;