diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java index 8c92323434d33ea0023b28c71221ea562aad7c15..f608a90e35d8d4af6d8541883c4c1bf9efe92fb9 100644 --- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/service/persistent/PersistentReplicator.java @@ -617,15 +617,20 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback return disconnectFuture; } + if (STATE_UPDATER.get(this) == State.Stopping) { + // Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by closeProducerAsync() + // which will at some point change the state to stopped + return CompletableFuture.completedFuture(null); + } + if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) { log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog()); return closeProducerAsync(); - } else { - // If there's already a reconnection happening, signal to close it whenever it's ready - STATE_UPDATER.set(this, State.Stopping); } + + STATE_UPDATER.set(this, State.Stopped); return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java index 1d85a7b91cd3f9cc7e00a2486a61f6f73b47f7ad..7bdfd1c1a7ab2aeb95808898e9a68f4680f0e24c 100644 --- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/service/PersistentTopicTest.java @@ -33,6 +33,7 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; +import java.lang.reflect.Field; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.URL; @@ -67,8 +68,11 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.mockito.verification.VerificationMode; +import org.powermock.api.mockito.PowerMockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -83,7 +87,9 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActi import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; +import com.yahoo.pulsar.client.api.ProducerConfiguration; import com.yahoo.pulsar.client.api.PulsarClient; +import com.yahoo.pulsar.client.impl.PulsarClientImpl; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; @@ -863,7 +869,7 @@ public class PersistentTopicTest { PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster; ConcurrentOpenHashMap replicatorMap = topic.getReplicators(); - ; + final URL brokerUrl = new URL( "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); PulsarClient client = PulsarClient.create(brokerUrl.toString()); @@ -895,4 +901,42 @@ public class PersistentTopicTest { DeleteCursorCallback callback = captor.getValue(); callback.deleteCursorComplete(null); } + + @Test + public void testClosingReplicationProducerTwice() throws Exception { + final String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice"; + String localCluster = "local"; + String remoteCluster = "remote"; + final ManagedLedger ledgerMock = mock(ManagedLedger.class); + doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject()); + doReturn(new ArrayList()).when(ledgerMock).getCursors(); + + PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); + String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster; + + final URL brokerUrl = new URL( + "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); + PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) ); + PulsarClientImpl clientImpl = (PulsarClientImpl) client; + Field conf = PersistentReplicator.class.getDeclaredField("producerConfiguration"); + conf.setAccessible(true); + + ManagedCursor cursor = mock(ManagedCursorImpl.class); + doReturn(remoteCluster).when(cursor).getName(); + brokerService.getReplicationClients().put(remoteCluster, client); + PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService); + + doReturn(new CompletableFuture()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + + replicator.startProducer(); + verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + + replicator.disconnect(false); + replicator.disconnect(false); + + replicator.startProducer(); + + verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName); + } + }