提交 a4a8eeb3 编写于 作者: B Boyang Jerry Peng 提交者: xiaolong.ran

Fix bk write failure part 2 (#5322)

* Bug in Message Deduplication that may cause incorrect behavior

* add tests

* fix error message

* fix client backoff

* fix tests

* cleaning up

* Fix handling of BK write failures for message dedup

* tests and clean up

* refactoring code

* fixing bugs

* addressing comments

* add missing license header

* Improve error handling of BK write failures

* fixing tests

* fixing bugs

* cleaning up

* addressing comments

* fixing tests

(cherry picked from commit 60abcaa9)
上级 a358009d
......@@ -444,4 +444,9 @@ public interface ManagedLedger {
* @return the last confirmed entry id
*/
Position getLastConfirmedEntry();
/**
* Signaling managed ledger that we can resume after BK write failure
*/
void readyToCreateNewLedger();
}
......@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.BoundType;
......@@ -59,7 +58,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
......@@ -186,9 +184,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
// Time period in which new write requests will not be accepted, after we fail in creating a new ledger.
final static long WaitTimeAfterLedgerCreationFailureMs = 10000;
volatile PositionImpl lastConfirmedEntry;
protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
......@@ -208,6 +203,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// the new instance will take over
Terminated, // Managed ledger was terminated and no more entries
// are allowed to be added. Reads are allowed
WriteFailed // The state that is transitioned to when a BK write failure happens
// After handling the BK write failure, managed ledger will get signalled to create a new ledger
}
// define boundaries for position based seeks and searches
......@@ -570,6 +567,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
} else if (state == State.Closed) {
addOperation.failed(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state == State.WriteFailed) {
pendingAddEntries.remove(addOperation);
addOperation.failed(new ManagedLedgerAlreadyClosedException("Waiting to recover from failure"));
return;
}
if (state == State.ClosingLedger || state == State.CreatingLedger) {
......@@ -579,14 +580,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.debug("[{}] Queue addEntry request", name);
}
} else if (state == State.ClosedLedger) {
long now = clock.millis();
if (now < lastLedgerCreationFailureTimestamp + WaitTimeAfterLedgerCreationFailureMs) {
// Deny the write request, since we haven't waited enough time since last attempt to create a new ledger
pendingAddEntries.remove(addOperation);
addOperation.failed(new ManagedLedgerException("Waiting for new ledger creation to complete"));
return;
}
// No ledger and no pending operations. Create a new ledger
if (log.isDebugEnabled()) {
log.debug("[{}] Creating a new ledger", name);
......@@ -623,6 +616,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
@Override
public void readyToCreateNewLedger() {
// only set transition state to ClosedLedger if current state is WriteFailed
if (STATE_UPDATER.compareAndSet(this, State.WriteFailed, State.ClosedLedger)){
log.info("[{}] Managed ledger is now ready to accept writes again", name);
}
}
@Override
public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
return openCursor(cursorName, InitialPosition.Latest);
......@@ -1196,10 +1197,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
ManagedLedgerException status = createManagedLedgerException(rc);
// no pending entries means that creating this new ledger is NOT caused by write failure
if (pendingAddEntries.isEmpty()) {
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
STATE_UPDATER.set(this, State.WriteFailed);
}
// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
lastLedgerCreationFailureTimestamp = clock.millis();
STATE_UPDATER.set(this, State.ClosedLedger);
} else {
log.info("[{}] Created new ledger {}", name, lh.getId());
ledgers.put(lh.getId(), LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build());
......@@ -1227,10 +1234,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (e instanceof BadVersionException) {
synchronized (ManagedLedgerImpl.this) {
log.error(
"[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger",
"[{}] Failed to update ledger list. z-node version mismatch. Closing managed ledger",
name);
STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
clearPendingAddEntries(e);
// Return ManagedLedgerFencedException to addFailed callback
// to indicate that the ledger is now fenced and topic needs to be closed
clearPendingAddEntries(new ManagedLedgerFencedException(e));
// Do not need to unlock ledgersListMutex here because we are going to close to topic anyways
return;
}
}
......
......@@ -101,8 +101,8 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
data.release();
if (cb != null) {
data.release();
cb.addFailed(e, ctx);
ml.mbean.recordAddEntryError();
}
......
......@@ -329,40 +329,6 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
latch.await();
}
@Test
public void recoverAfterWriteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
// With one single error, the write should succeed
ledger.addEntry("entry".getBytes());
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
bkc.failNow(BKException.Code.BookieHandleNotAvailableException);
zkc.failNow(Code.CONNECTIONLOSS);
try {
ledger.addEntry("entry".getBytes());
fail("should fail");
} catch (ManagedLedgerException e) {
// ok
}
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
// Next add will fail as well
try {
ledger.addEntry("entry".getBytes());
fail("should fail");
} catch (ManagedLedgerException e) {
// ok
}
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
}
@Test
public void recoverAfterZnodeVersionError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
......@@ -376,7 +342,8 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
// This write will try to create a ledger and it will fail at it
ledger.addEntry("entry".getBytes());
fail("should fail");
} catch (BadVersionException e) {
} catch (ManagedLedgerFencedException e) {
assertEquals(e.getCause().getCause().getClass(), org.apache.zookeeper.KeeperException.BadVersionException.class);
// ok
}
......@@ -390,7 +357,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
}
@Test
public void recoverLongTimeAfterWriteError() throws Exception {
public void recoverAfterWriteError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ManagedCursor cursor = ledger.openCursor("c1");
......@@ -410,7 +377,7 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
// ok
}
Thread.sleep(ManagedLedgerImpl.WaitTimeAfterLedgerCreationFailureMs / 2);
bkc.failNow(BKException.Code.NotEnoughBookiesException);
try {
ledger.addEntry("entry-3".getBytes());
fail("should fail");
......@@ -418,8 +385,10 @@ public class ManagedLedgerErrorsTest extends MockedBookKeeperTestCase {
// ok
}
// After some time, the managed ledger will be available for writes again
Thread.sleep(ManagedLedgerImpl.WaitTimeAfterLedgerCreationFailureMs / 2 + 10);
assertEquals(cursor.getNumberOfEntriesInBacklog(), 1);
// Signal that ManagedLedger has recovered from write error and will be availbe for writes again
ledger.readyToCreateNewLedger();
// Next add should succeed, and the previous write should not appear
ledger.addEntry("entry-4".getBytes());
......
......@@ -1418,6 +1418,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
assertEquals(ledger.getLedgersInfoAsList().size(), 0);
// Next write should fail as well
bkc.failNow(BKException.Code.NoBookieAvailableException);
try {
ledger.addEntry("entry".getBytes());
fail("Should have received exception");
......
......@@ -321,6 +321,9 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
if (isFenced) {
messageDeduplication.resetHighestSequenceIdPushed();
log.info("[{}] Un-fencing topic...", topic);
// signal to managed ledger that we are ready to resume by creating a new ledger
ledger.readyToCreateNewLedger();
isFenced = false;
}
......@@ -342,15 +345,13 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
@Override
public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
// fence topic when failed to write a message to BK
isFenced = true;
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();
} else {
// fence topic when failed to write a message to BK
isFenced = true;
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.forEach(producer -> futures.add(producer.disconnect()));
......
......@@ -31,9 +31,9 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.io.PulsarFunctionE2ETest;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -46,10 +46,13 @@ import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import static org.mockito.Mockito.spy;
......@@ -73,7 +76,7 @@ public class ClientDeduplicationFailureTest {
private final int brokerWebServicePort = PortManager.nextFreePort();
private final int brokerServicePort = PortManager.nextFreePort();
private static final Logger log = LoggerFactory.getLogger(PulsarFunctionE2ETest.class);
private static final Logger log = LoggerFactory.getLogger(ClientDeduplicationFailureTest.class);
@BeforeMethod(timeOut = 300000)
void setup(Method method) throws Exception {
......@@ -94,6 +97,9 @@ public class ClientDeduplicationFailureTest {
config.setTlsAllowInsecureConnection(true);
config.setAdvertisedAddress("localhost");
config.setLoadBalancerSheddingEnabled(false);
config.setLoadBalancerAutoBundleSplitEnabled(false);
config.setLoadBalancerEnabled(false);
config.setLoadBalancerAutoUnloadSplitBundlesEnabled(false);
config.setAllowAutoTopicCreationType("non-partitioned");
......@@ -127,7 +133,141 @@ public class ClientDeduplicationFailureTest {
bkEnsemble.stop();
}
@Test
private static class ProducerThread implements Runnable {
private volatile boolean isRunning = false;
private Thread thread;
private Producer<String> producer;
private long i = 1;
private AtomicLong atomicLong = new AtomicLong(0);
private CompletableFuture<MessageId> lastMessageFuture;
public ProducerThread(Producer<String> producer) {
this.thread = new Thread(this);
this.producer = producer;
}
@Override
public void run() {
while(isRunning) {
lastMessageFuture = producer.newMessage().sequenceId(i).value("foo-" + i).sendAsync();
lastMessageFuture.thenAccept(messageId -> {
atomicLong.incrementAndGet();
}).exceptionally(ex -> {
log.info("publish exception:", ex);
return null;
});
i++;
}
log.info("done Producing! Last send: {}", i);
}
public void start() {
this.isRunning = true;
this.thread.start();
}
public void stop() {
this.isRunning = false;
try {
log.info("Waiting for last message to complete");
try {
this.lastMessageFuture.get(60, TimeUnit.SECONDS);
} catch (TimeoutException e) {
throw new RuntimeException("Last message hasn't completed within timeout!");
}
} catch (Exception e) {
throw new RuntimeException(e);
}
log.info("Producer Thread stopped!");
}
public long getLastSeqId() {
return this.atomicLong.get();
}
}
@Test(timeOut = 300000)
public void testClientDeduplicationCorrectnessWithFailure() throws Exception {
final String namespacePortion = "dedup";
final String replNamespace = tenant + "/" + namespacePortion;
final String sourceTopic = "persistent://" + replNamespace + "/my-topic1";
admin.namespaces().createNamespace(replNamespace);
Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
admin.namespaces().setDeduplicationStatus(replNamespace, true);
admin.namespaces().setRetention(replNamespace, new RetentionPolicies(-1, -1));
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.blockIfQueueFull(true).sendTimeout(0, TimeUnit.SECONDS)
.topic(sourceTopic)
.producerName("test-producer-1")
.create();
ProducerThread producerThread = new ProducerThread(producer);
producerThread.start();
retryStrategically((test) -> {
try {
TopicStats topicStats = admin.topics().getStats(sourceTopic);
return topicStats.publishers.size() == 1 && topicStats.publishers.get(0).getProducerName().equals("test-producer-1") && topicStats.storageSize > 0;
} catch (PulsarAdminException e) {
return false;
}
}, 5, 200);
TopicStats topicStats = admin.topics().getStats(sourceTopic);
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.publishers.get(0).getProducerName(), "test-producer-1");
assertTrue(topicStats.storageSize > 0);
for (int i = 0; i < 5; i++) {
log.info("Stopping BK...");
bkEnsemble.stopBK();
Thread.sleep(1000 + new Random().nextInt(500));
log.info("Starting BK...");
bkEnsemble.startBK();
}
producerThread.stop();
// send last message
producer.newMessage().sequenceId(producerThread.getLastSeqId() + 1).value("end").send();
producer.close();
Reader<String> reader = pulsarClient.newReader(Schema.STRING).startMessageId(MessageId.earliest).topic(sourceTopic).create();
Message<String> prevMessage = null;
Message<String> message = null;
int count = 0;
while(true) {
message = reader.readNext(5, TimeUnit.SECONDS);
if (message == null) {
break;
}
if (message.getValue().equals("end")) {
log.info("Last seq Id received: {}", prevMessage.getSequenceId());
break;
}
if (prevMessage == null) {
assertEquals(message.getSequenceId(), 1);
} else {
assertEquals(message.getSequenceId(), prevMessage.getSequenceId() + 1);
}
prevMessage = message;
count++;
}
log.info("# of messages read: {}", count);
assertTrue(prevMessage != null);
assertEquals(prevMessage.getSequenceId(), producerThread.getLastSeqId());
}
@Test(timeOut = 300000)
public void testClientDeduplicationWithBkFailure() throws Exception {
final String namespacePortion = "dedup";
final String replNamespace = tenant + "/" + namespacePortion;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册