提交 85442005 编写于 作者: J jai1 提交者: Rajan

Fix for Infinite loop in PersistentReplicator.startProducer() (#275)

* Fix for Infinite loop in PersistentReplicator.startProducer()

* Fix for Infinite loop in PersistentReplicator.startProducer()
上级 5c783389
...@@ -617,15 +617,20 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback ...@@ -617,15 +617,20 @@ public class PersistentReplicator implements ReadEntriesCallback, DeleteCallback
return disconnectFuture; return disconnectFuture;
} }
if (STATE_UPDATER.get(this) == State.Stopping) {
// Do nothing since the all "STATE_UPDATER.set(this, Stopping)" instructions are followed by closeProducerAsync()
// which will at some point change the state to stopped
return CompletableFuture.completedFuture(null);
}
if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) if (producer != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
|| STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) { || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) {
log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster, log.info("[{}][{} -> {}] Disconnect replicator at position {} with backlog {}", topicName, localCluster,
remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog()); remoteCluster, cursor.getMarkDeletedPosition(), cursor.getNumberOfEntriesInBacklog());
return closeProducerAsync(); return closeProducerAsync();
} else {
// If there's already a reconnection happening, signal to close it whenever it's ready
STATE_UPDATER.set(this, State.Stopping);
} }
STATE_UPDATER.set(this, State.Stopped);
return CompletableFuture.completedFuture(null); return CompletableFuture.completedFuture(null);
} }
......
...@@ -33,6 +33,7 @@ import static org.testng.Assert.assertNull; ...@@ -33,6 +33,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail; import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URL; import java.net.URL;
...@@ -67,8 +68,11 @@ import org.mockito.ArgumentCaptor; ...@@ -67,8 +68,11 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.powermock.api.mockito.PowerMockito;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
...@@ -83,7 +87,9 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActi ...@@ -83,7 +87,9 @@ import com.yahoo.pulsar.broker.service.persistent.PersistentDispatcherSingleActi
import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator; import com.yahoo.pulsar.broker.service.persistent.PersistentReplicator;
import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription; import com.yahoo.pulsar.broker.service.persistent.PersistentSubscription;
import com.yahoo.pulsar.broker.service.persistent.PersistentTopic; import com.yahoo.pulsar.broker.service.persistent.PersistentTopic;
import com.yahoo.pulsar.client.api.ProducerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient; import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; import com.yahoo.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata; import com.yahoo.pulsar.common.api.proto.PulsarApi.MessageMetadata;
...@@ -863,7 +869,7 @@ public class PersistentTopicTest { ...@@ -863,7 +869,7 @@ public class PersistentTopicTest {
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService); PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster; String remoteReplicatorName = topic.replicatorPrefix + "." + remoteCluster;
ConcurrentOpenHashMap<String, PersistentReplicator> replicatorMap = topic.getReplicators(); ConcurrentOpenHashMap<String, PersistentReplicator> replicatorMap = topic.getReplicators();
;
final URL brokerUrl = new URL( final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort()); "http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = PulsarClient.create(brokerUrl.toString()); PulsarClient client = PulsarClient.create(brokerUrl.toString());
...@@ -895,4 +901,42 @@ public class PersistentTopicTest { ...@@ -895,4 +901,42 @@ public class PersistentTopicTest {
DeleteCursorCallback callback = captor.getValue(); DeleteCursorCallback callback = captor.getValue();
callback.deleteCursorComplete(null); callback.deleteCursorComplete(null);
} }
@Test
public void testClosingReplicationProducerTwice() throws Exception {
final String globalTopicName = "persistent://prop/global/ns/testClosingReplicationProducerTwice";
String localCluster = "local";
String remoteCluster = "remote";
final ManagedLedger ledgerMock = mock(ManagedLedger.class);
doNothing().when(ledgerMock).asyncDeleteCursor(anyObject(), anyObject(), anyObject());
doReturn(new ArrayList<Object>()).when(ledgerMock).getCursors();
PersistentTopic topic = new PersistentTopic(globalTopicName, ledgerMock, brokerService);
String remoteReplicatorName = topic.replicatorPrefix + "." + localCluster;
final URL brokerUrl = new URL(
"http://" + pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getBrokerServicePort());
PulsarClient client = spy( PulsarClient.create(brokerUrl.toString()) );
PulsarClientImpl clientImpl = (PulsarClientImpl) client;
Field conf = PersistentReplicator.class.getDeclaredField("producerConfiguration");
conf.setAccessible(true);
ManagedCursor cursor = mock(ManagedCursorImpl.class);
doReturn(remoteCluster).when(cursor).getName();
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
replicator.startProducer();
verify(clientImpl).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
replicator.disconnect(false);
replicator.disconnect(false);
replicator.startProducer();
verify(clientImpl, Mockito.times(2)).createProducerAsync(globalTopicName, (ProducerConfiguration) conf.get(replicator), remoteReplicatorName);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册