未验证 提交 07bb8453 编写于 作者: M Matteo Merli 提交者: GitHub

Allow for topic deletions with regex consumers (#5230)

* Allow for topic deletions with regex consumers

* Fixed test compilation

* One more compile fix

* Fixed BrokerServiceAutoTopicCreationTest
上级 23c76241
......@@ -116,6 +116,12 @@ public class BrokerServiceException extends Exception {
}
}
public static class TopicNotFoundException extends BrokerServiceException {
public TopicNotFoundException(String msg) {
super(msg);
}
}
public static class SubscriptionBusyException extends BrokerServiceException {
public SubscriptionBusyException(String msg) {
super(msg);
......@@ -180,6 +186,8 @@ public class BrokerServiceException extends Exception {
} else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
|| t instanceof SubscriptionFencedException) {
return PulsarApi.ServerError.ServiceNotReady;
} else if (t instanceof TopicNotFoundException) {
return PulsarApi.ServerError.TopicNotFound;
} else if (t instanceof IncompatibleSchemaException
|| t instanceof InvalidSchemaDataException) {
// for backward compatible with old clients, invalid schema data
......
......@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.web.RestException;
......@@ -611,6 +612,7 @@ public class ServerCnx extends PulsarHandler {
final InitialPosition initialPosition = subscribe.getInitialPosition();
final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
final boolean forceTopicCreation = subscribe.getForceTopicCreation();
CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
......@@ -676,8 +678,18 @@ public class ServerCnx extends PulsarHandler {
}
}
service.getOrCreateTopic(topicName.toString())
.thenCompose(topic -> {
boolean createTopicIfDoesNotExist = forceTopicCreation
&& service.pulsar().getConfig().isAllowAutoTopicCreation();
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
if (!optTopic.isPresent()) {
return FutureUtil
.failedFuture(new TopicNotFoundException("Topic does not exist"));
}
Topic topic = optTopic.get();
if (schema != null) {
return topic.addSchemaIfIdleOrCheckCompatible(schema)
.thenCompose(isCompatible -> {
......@@ -728,6 +740,9 @@ public class ServerCnx extends PulsarHandler {
remoteAddress, topicName, subscriptionName,
exception.getCause().getMessage());
}
} else if (exception.getCause() instanceof BrokerServiceException) {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage());
} else {
log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
subscriptionName, exception.getCause().getMessage(), exception);
......
......@@ -112,7 +112,8 @@ public class RawReaderImpl implements RawReader {
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
Schema.BYTES, null
Schema.BYTES, null,
true
);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
......
......@@ -22,6 +22,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
......@@ -77,6 +79,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
final String subscriptionName = "test-topic-sub";
try {
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
......
......@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
......@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.TenantInfo;
......@@ -760,4 +762,51 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
producer2.close();
producer3.close();
}
@Test()
public void testTopicDeletion() throws Exception {
String baseTopicName = "persistent://my-property/my-ns/pattern-topic-" + System.currentTimeMillis();
Pattern pattern = Pattern.compile(baseTopicName + ".*");
// Create 2 topics
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-1")
.create();
Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
.topic(baseTopicName + "-2")
.create();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topicsPattern(pattern)
.patternAutoDiscoveryPeriod(1)
.subscriptionName("sub")
.subscribe();
assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;
// 4. verify consumer get methods
assertSame(consumerImpl.getPattern(), pattern);
assertEquals(consumerImpl.getTopics().size(), 2);
producer1.send("msg-1");
producer1.close();
Message<String> message = consumer.receive();
assertEquals(message.getValue(), "msg-1");
consumer.acknowledge(message);
// Force delete the topic while the regex consumer is connected
admin.topics().delete(baseTopicName + "-1", true);
producer2.send("msg-2");
message = consumer.receive();
assertEquals(message.getValue(), "msg-2");
consumer.acknowledge(message);
assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
}
}
......@@ -528,7 +528,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);
// 8. re-subscribe topic3
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
subFuture.get();
// 9. producer publish messages
......
......@@ -179,6 +179,22 @@ public class PulsarClientException extends IOException {
}
}
/**
* Topic does not exist and cannot be created.
*/
public static class TopicDoesNotExistException extends PulsarClientException {
/**
* Constructs an {@code TopicDoesNotExistException} with the specified detail message.
*
* @param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*/
public TopicDoesNotExistException(String msg) {
super(msg);
}
}
/**
* Lookup exception thrown by Pulsar client.
*/
......
......@@ -18,6 +18,14 @@
*/
package org.apache.pulsar.client.cli;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
......@@ -29,6 +37,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.commons.io.HexDump;
import org.apache.commons.lang3.StringUtils;
......@@ -36,6 +45,7 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
......@@ -54,14 +64,6 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.Parameters;
import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
/**
* pulsar-client consume command implementation.
*
......@@ -91,7 +93,10 @@ public class CmdConsume {
@Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume, "
+ "value 0 means to consume messages as fast as possible.")
private double consumeRate = 0;
@Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
private boolean isRegex = false;
private ClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
......@@ -144,7 +149,7 @@ public class CmdConsume {
throw (new ParameterException("Number of messages should be zero or positive."));
String topic = this.mainOptions.get(0);
if(this.serviceURL.startsWith("ws")) {
return consumeFromWebSocket(topic);
}else {
......@@ -158,8 +163,17 @@ public class CmdConsume {
try {
PulsarClient client = clientBuilder.build();
Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType).subscribe();
ConsumerBuilder<byte[]> builder = client.newConsumer()
.subscriptionName(this.subscriptionName)
.subscriptionType(subscriptionType);
if (isRegex) {
builder.topicsPattern(Pattern.compile(topic));
} else {
builder.topic(topic);
}
Consumer<byte[]> consumer = builder.subscribe();
RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
......@@ -197,13 +211,13 @@ public class CmdConsume {
int returnCode = 0;
TopicName topicName = TopicName.get(topic);
String wsTopic = String.format(
"%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/")
+ "%s/%s/%s?subscriptionType=%s",
topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(),
subscriptionName, subscriptionType.toString());
String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic;
URI consumerUri = URI.create(consumerBaseUri);
......@@ -252,7 +266,7 @@ public class CmdConsume {
LOG.debug("No message to consume after waiting for 5 seconds.");
} else {
try {
System.out.println(Base64.getDecoder().decode(msg));
System.out.println(Base64.getDecoder().decode(msg));
}catch(Exception e) {
System.out.println(msg);
}
......
......@@ -911,6 +911,8 @@ public class ClientCnx extends PulsarHandler {
return new PulsarClientException.TopicTerminatedException(errorMsg);
case IncompatibleSchema:
return new PulsarClientException.IncompatibleSchemaException(errorMsg);
case TopicNotFound:
return new PulsarClientException.TopicDoesNotExistException(errorMsg);
case UnknownError:
default:
return new PulsarClientException(errorMsg);
......
......@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.Commands;
......@@ -141,6 +142,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
protected volatile boolean paused;
private final boolean createTopicIfDoesNotExist;
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
......@@ -151,20 +154,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors);
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors);
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
......@@ -179,6 +187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
......@@ -510,7 +519,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
si);
si, createTopicIfDoesNotExist);
if (startMessageIdData != null) {
startMessageIdData.recycle();
}
......@@ -548,18 +557,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
if (e.getCause() instanceof PulsarClientException && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
if (e.getCause() instanceof PulsarClientException
&& getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
&& System.currentTimeMillis() < subscribeTimeout) {
reconnectLater(e.getCause());
return null;
}
if (!subscribeFuture.isDone()) {
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
closeConsumerTasks();
subscribeFuture.completeExceptionally(e);
client.cleanupConsumer(this);
} else if (e.getCause() instanceof TopicDoesNotExistException) {
// The topic was deleted after the consumer was created, and we're
// not allowed to recreate the topic. This can happen in few cases:
// * Regex consumer getting error after topic gets deleted
// * Regular consumer after topic is manually delete and with
// auto-topic-creation set to false
// No more retries are needed in this case.
setState(State.Failed);
client.cleanupConsumer(this);
log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some error, keep trying
reconnectLater(e.getCause());
......
......@@ -45,6 +45,8 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
......@@ -101,14 +103,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
subscribeFuture, schema, interceptors);
subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
}
MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
schema, interceptors);
......@@ -152,7 +154,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(this::subscribeAsync)
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
.collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
......@@ -661,7 +663,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
// subscribe one more given topic
public CompletableFuture<Void> subscribeAsync(String topicName) {
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
if (!topicNameValid(topicName)) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
......@@ -675,12 +677,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
client.getPartitionedTopicMetadata(topicName)
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
subscribeResult.completeExceptionally(ex1);
return null;
});
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
subscribeResult.completeExceptionally(ex1);
return null;
});
return subscribeResult;
}
......@@ -702,7 +705,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer> future = new CompletableFuture<>();
MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
future, schema, interceptors);
future, schema, interceptors, true /* createTopicIfDoesNotExist */);
future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))
......@@ -728,22 +731,24 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
subscribeTopicPartitions(subscribeResult, topicName, numberPartitions, true /* createTopicIfDoesNotExist */);
return subscribeResult;
}
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
if (null == cause) {
doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}
private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
}
......@@ -767,8 +772,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
SubscriptionMode.Durable, null, schema, interceptors
);
SubscriptionMode.Durable, null, schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
......@@ -780,8 +785,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
schema, interceptors
);
schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
......@@ -913,6 +918,59 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
return unsubscribeFuture;
}
// Remove a consumer for a topic
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
.filter(consumer -> {
String consumerTopicName = consumer.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
return true;
} else {
return false;
}
}).collect(Collectors.toList());
List<CompletableFuture<Void>> futureList = consumersToClose.stream()
.map(ConsumerImpl::closeAsync).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.whenComplete((r, ex) -> {
if (ex == null) {
consumersToClose.forEach(consumer1 -> {
consumers.remove(consumer1.getTopic());
pausedConsumers.remove(consumer1);
allTopicPartitionsNumber.decrementAndGet();
});
topics.remove(topicName);
((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}",
topicName, subscription, consumerName, allTopicPartitionsNumber);
} else {
unsubscribeFuture.completeExceptionally(ex);
setState(State.Failed);
log.error("[{}] [{}] [{}] Could not remove Topics Consumer",
topicName, subscription, consumerName, ex.getCause());
}
});
return unsubscribeFuture;
}
// get topics name
public List<String> getTopics() {
return topics.keySet().stream().collect(Collectors.toList());
......@@ -997,8 +1055,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors
);
partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
......
......@@ -54,7 +54,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture,
Schema<T> schema, Mode subscriptionMode, ConsumerInterceptors<T> interceptors) {
super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors);
super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors,
false /* createTopicIfDoesNotExist */);
this.topicsPattern = topicsPattern;
this.subscriptionMode = subscriptionMode;
......@@ -129,7 +130,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
}
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic)));
removedTopics.stream().forEach(topic -> futures.add(removeConsumerAsync(topic)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> removeFuture.complete(null))
.exceptionally(ex -> {
......@@ -150,7 +151,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
}
List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic)));
addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> addFuture.complete(null))
.exceptionally(ex -> {
......
......@@ -350,8 +350,8 @@ public class PulsarClientImpl implements PulsarClient {
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors
);
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
synchronized (consumers) {
......@@ -370,7 +370,8 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
true /* createTopicIfDoesNotExist */);
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
......
......@@ -83,10 +83,8 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null
);
partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
true /* createTopicIfDoesNotExist */);
}
@Override
......
......@@ -49,17 +49,10 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors) {
this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
}
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
schema, interceptors);
schema, interceptors, createTopicIfDoesNotExist);
}
@Override
......
......@@ -60,8 +60,8 @@ public class ConsumerImplTest {
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null
);
executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
true);
}
@Test(invocationTimeOut = 1000)
......
......@@ -57,7 +57,7 @@ public class MultiTopicsConsumerImplTest {
MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
clientImpl, consumerConfData,
listenerExecutor, null, null, null);
listenerExecutor, null, null, null, true);
impl.getStats();
}
......
......@@ -8804,6 +8804,10 @@ public final class PulsarApi {
// optional bool replicate_subscription_state = 14;
boolean hasReplicateSubscriptionState();
boolean getReplicateSubscriptionState();
// optional bool force_topic_creation = 15 [default = true];
boolean hasForceTopicCreation();
boolean getForceTopicCreation();
}
public static final class CommandSubscribe extends
org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
......@@ -9145,6 +9149,16 @@ public final class PulsarApi {
return replicateSubscriptionState_;
}
// optional bool force_topic_creation = 15 [default = true];
public static final int FORCE_TOPIC_CREATION_FIELD_NUMBER = 15;
private boolean forceTopicCreation_;
public boolean hasForceTopicCreation() {
return ((bitField0_ & 0x00002000) == 0x00002000);
}
public boolean getForceTopicCreation() {
return forceTopicCreation_;
}
private void initFields() {
topic_ = "";
subscription_ = "";
......@@ -9160,6 +9174,7 @@ public final class PulsarApi {
schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
replicateSubscriptionState_ = false;
forceTopicCreation_ = true;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
......@@ -9258,6 +9273,9 @@ public final class PulsarApi {
if (((bitField0_ & 0x00001000) == 0x00001000)) {
output.writeBool(14, replicateSubscriptionState_);
}
if (((bitField0_ & 0x00002000) == 0x00002000)) {
output.writeBool(15, forceTopicCreation_);
}
}
private int memoizedSerializedSize = -1;
......@@ -9322,6 +9340,10 @@ public final class PulsarApi {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(14, replicateSubscriptionState_);
}
if (((bitField0_ & 0x00002000) == 0x00002000)) {
size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
.computeBoolSize(15, forceTopicCreation_);
}
memoizedSerializedSize = size;
return size;
}
......@@ -9463,6 +9485,8 @@ public final class PulsarApi {
bitField0_ = (bitField0_ & ~0x00001000);
replicateSubscriptionState_ = false;
bitField0_ = (bitField0_ & ~0x00002000);
forceTopicCreation_ = true;
bitField0_ = (bitField0_ & ~0x00004000);
return this;
}
......@@ -9553,6 +9577,10 @@ public final class PulsarApi {
to_bitField0_ |= 0x00001000;
}
result.replicateSubscriptionState_ = replicateSubscriptionState_;
if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
to_bitField0_ |= 0x00002000;
}
result.forceTopicCreation_ = forceTopicCreation_;
result.bitField0_ = to_bitField0_;
return result;
}
......@@ -9608,6 +9636,9 @@ public final class PulsarApi {
if (other.hasReplicateSubscriptionState()) {
setReplicateSubscriptionState(other.getReplicateSubscriptionState());
}
if (other.hasForceTopicCreation()) {
setForceTopicCreation(other.getForceTopicCreation());
}
return this;
}
......@@ -9764,6 +9795,11 @@ public final class PulsarApi {
replicateSubscriptionState_ = input.readBool();
break;
}
case 120: {
bitField0_ |= 0x00004000;
forceTopicCreation_ = input.readBool();
break;
}
}
}
}
......@@ -10227,6 +10263,27 @@ public final class PulsarApi {
return this;
}
// optional bool force_topic_creation = 15 [default = true];
private boolean forceTopicCreation_ = true;
public boolean hasForceTopicCreation() {
return ((bitField0_ & 0x00004000) == 0x00004000);
}
public boolean getForceTopicCreation() {
return forceTopicCreation_;
}
public Builder setForceTopicCreation(boolean value) {
bitField0_ |= 0x00004000;
forceTopicCreation_ = value;
return this;
}
public Builder clearForceTopicCreation() {
bitField0_ = (bitField0_ & ~0x00004000);
forceTopicCreation_ = true;
return this;
}
// @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
}
......@@ -467,13 +467,15 @@ public class Commands {
SubType subType, int priorityLevel, String consumerName) {
return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
false /* isReplicated */, InitialPosition.Earliest, null);
false /* isReplicated */, InitialPosition.Earliest, null,
true /* createTopicIfDoesNotExist */);
}
public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo,
boolean createTopicIfDoesNotExist) {
CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
subscribeBuilder.setTopic(topic);
subscribeBuilder.setSubscription(subscription);
......@@ -486,6 +488,8 @@ public class Commands {
subscribeBuilder.setReadCompacted(readCompacted);
subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
subscribeBuilder.setReplicateSubscriptionState(isReplicated);
subscribeBuilder.setForceTopicCreation(createTopicIfDoesNotExist);
if (startMessageId != null) {
subscribeBuilder.setStartMessageId(startMessageId);
}
......
......@@ -268,7 +268,7 @@ message CommandSubscribe {
// Signal wether the subscription should be backed by a
// durable cursor or not
optional bool durable = 8 [default = true];
// If specified, the subscription will position the cursor
// markd-delete position on the particular message id and
// will send messages from that point
......@@ -292,6 +292,13 @@ message CommandSubscribe {
// to periodically sync the state of replicated subscriptions
// across different clusters (when using geo-replication).
optional bool replicate_subscription_state = 14;
// If true, the subscribe operation will cause a topic to be
// created if it does not exist already (and if topic auto-creation
// is allowed by broker.
// If false, the subscribe operation will fail if the topic
// does not exist.
optional bool force_topic_creation = 15 [default = true];
}
message CommandPartitionedTopicMetadata {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册