未验证 提交 c54a47e2 编写于 作者: S Szymon Andrzejczak 提交者: GitHub

Ensure parallel invocations of MultiTopicsConsumerImpl::subscribeAsync with...

Ensure parallel invocations of MultiTopicsConsumerImpl::subscribeAsync with same topic name do not hang. Fixes  #7556. (#7691)

Fixes #7556

### Motivation

We use PatternMultiTopicsConsumerImpl in our solution, but we need to subscribe to new topics instantly after their creation. Default behavior of PatternMultiTopicsConsumerImpl is too slow and too costly for our use case.
This is why we've created an external mechanism which notifies us about new topics. We then subscribe to them using MultiTopicsConsumerImpl#subscribeAsync.
However, if this method is invoked multiple times with same topicName simultaneously, an error may occur and topic's consumers may get closed. It happens, because multiple invocations can pass the initial check in topicNameValid(String topicName), as the `topics` map does not contain any entry for the topic yet. More detailed description is available at issue's page: https://github.com/apache/pulsar/issues/7556

### Modifications

Code in MultiTopicsConsumerImpl#doSubscribeTopicPartitions now checks if `topics` map already contains an entry for a given topic. It does so by checking the return value of putIfAbsent method. If it does already contain an entry, then subscribeResult future is completed exceptionally and the method returns. It prevents subsequent `checkState(currentAllTopicsPartitionsNumber == numTopics, "...")` invocation from throwing an exception which would cause topic's consumers to get closed.

### Verifying this change

Added new unit test to MultiTopicsConsumerImplTest class.
上级 69cbd26e
......@@ -820,7 +820,16 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
List<CompletableFuture<Consumer<T>>> futureList;
if (numPartitions > 0) {
this.topics.putIfAbsent(topicName, numPartitions);
// Below condition is true if subscribeAsync() has been invoked second time with same
// topicName before the first invocation had reached this point.
boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, numPartitions) != null;
if (isTopicBeingSubscribedForInOtherThread) {
String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return;
}
allTopicPartitionsNumber.addAndGet(numPartitions);
int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
......@@ -844,7 +853,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
})
.collect(Collectors.toList());
} else {
this.topics.putIfAbsent(topicName, 1);
boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, 1) != null;
if (isTopicBeingSubscribedForInOtherThread) {
String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return;
}
allTopicPartitionsNumber.incrementAndGet();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
......@@ -863,8 +879,9 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum();
checkState(allTopicPartitionsNumber.get() == numTopics,
"allTopicPartitionsNumber " + allTopicPartitionsNumber.get()
int currentAllTopicsPartitionsNumber = allTopicPartitionsNumber.get();
checkState(currentAllTopicsPartitionsNumber == numTopics,
"allTopicPartitionsNumber " + currentAllTopicsPartitionsNumber
+ " not equals expected: " + numTopics);
// We have successfully created new consumers, so we can start receiving messages for them
......
......@@ -19,18 +19,35 @@
package org.apache.pulsar.client.impl;
import com.google.common.collect.Sets;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.testng.annotations.Test;
import java.net.SocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.expectThrows;
/**
* Unit Tests of {@link MultiTopicsConsumerImpl}.
......@@ -61,4 +78,71 @@ public class MultiTopicsConsumerImplTest {
impl.getStats();
}
// Test uses a mocked PulsarClientImpl which will complete the getPartitionedTopicMetadata() internal async call
// after a delay longer than the interval between the two subscribeAsync() calls in the test method body.
//
// Code under tests is using CompletableFutures. Theses may hang indefinitely if code is broken.
// That's why a test timeout is defined.
@Test(timeOut = 5000)
public void testParallelSubscribeAsync() throws Exception {
String topicName = "parallel-subscribe-async-topic";
String subscriptionName = "parallel-subscribe-async-subscription";
String serviceUrl = "pulsar://localhost:6650";
Schema<byte[]> schema = Schema.BYTES;
ExecutorService listenerExecutor = mock(ExecutorService.class);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setServiceUrl(serviceUrl);
ConsumerConfigurationData<byte[]> consumerConfData = new ConsumerConfigurationData<>();
consumerConfData.setSubscriptionName(subscriptionName);
int completionDelayMillis = 100;
PulsarClientImpl client = setUpPulsarClientMock(schema, completionDelayMillis);
MultiTopicsConsumerImpl<byte[]> impl = new MultiTopicsConsumerImpl<>(client, consumerConfData, listenerExecutor,
new CompletableFuture<>(), schema, null, true);
CompletableFuture<Void> firstInvocation = impl.subscribeAsync(topicName, true);
Thread.sleep(5); // less than completionDelayMillis
CompletableFuture<Void> secondInvocation = impl.subscribeAsync(topicName, true);
firstInvocation.get(); // does not throw
Throwable t = expectThrows(ExecutionException.class, secondInvocation::get);
Throwable cause = t.getCause();
assertEquals(cause.getClass(), PulsarClientException.class);
assertTrue(cause.getMessage().endsWith(
"Failed to subscribe for topic [parallel-subscribe-async-topic] in topics consumer. "
+ "Topic is already being subscribed for in other thread."));
}
private <T> PulsarClientImpl setUpPulsarClientMock(Schema<T> schema, int completionDelayMillis) {
PulsarClientImpl clientMock = mock(PulsarClientImpl.class, Mockito.RETURNS_DEEP_STUBS);
when(clientMock.getConfiguration()).thenReturn(mock(ClientConfigurationData.class));
when(clientMock.timer()).thenReturn(mock(Timer.class));
when(clientMock.getPartitionedTopicMetadata(any())).thenAnswer(invocation -> createDelayedCompletedFuture(
new PartitionedTopicMetadata(), completionDelayMillis));
when(clientMock.<T>preProcessSchemaBeforeSubscribe(any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(schema));
when(clientMock.externalExecutorProvider()).thenReturn(mock(ExecutorProvider.class));
when(clientMock.eventLoopGroup().next()).thenReturn(mock(EventLoop.class));
ClientCnx clientCnxMock = mock(ClientCnx.class, Mockito.RETURNS_DEEP_STUBS);
when(clientCnxMock.ctx()).thenReturn(mock(ChannelHandlerContext.class));
when(clientCnxMock.sendRequestWithId(any(), anyLong()))
.thenReturn(CompletableFuture.completedFuture(mock(ProducerResponse.class)));
when(clientCnxMock.channel().remoteAddress()).thenReturn(mock(SocketAddress.class));
when(clientMock.getConnection(any())).thenReturn(CompletableFuture.completedFuture(clientCnxMock));
return clientMock;
}
private <T> CompletableFuture<T> createDelayedCompletedFuture(T result, int delayMillis) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(delayMillis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册