未验证 提交 2ccea556 编写于 作者: J Jia Zhai 提交者: guangning

client: make SubscriptionMode a member of ConsumerConfigurationData (#6337)

Currently, SubscriptionMode is a parameter to create ConsumerImpl, but it is not exported out, and user could not set this value for consumer.  This change tries to make SubscriptionMode a member of ConsumerConfigurationData, so user could set this parameter when create consumer.  

(cherry picked from commit 208af7cd)
上级 ea2cfad5
......@@ -117,7 +117,6 @@ public class RawReaderImpl implements RawReader {
TopicName.getPartitionIndex(conf.getSingleTopic()),
false,
consumerFuture,
SubscriptionMode.Durable,
MessageId.earliest,
0 /* startMessageRollbackDurationInSec */,
Schema.BYTES, null,
......
......@@ -228,6 +228,21 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> subscriptionType(SubscriptionType subscriptionType);
/**
* Select the subscription mode to be used when subscribing to the topic.
*
* <p>Options are:
* <ul>
* <li>{@link SubscriptionMode#Durable} (Default)</li>
* <li>{@link SubscriptionMode#NonDurable}</li>
* </ul>
*
* @param subscriptionMode
* the subscription mode value
* @return the consumer builder instance
*/
ConsumerBuilder<T> subscriptionMode(SubscriptionMode subscriptionMode);
/**
* Sets a {@link MessageListener} for the consumer
*
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
/**
* Types of subscription mode supported by Pulsar.
*/
public enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Durable,
// Lightweight subscription mode that doesn't have a durable cursor associated
NonDurable
}
......@@ -47,6 +47,7 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
......@@ -191,6 +192,13 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return this;
}
@Override
public ConsumerBuilder<T> subscriptionMode(@NonNull SubscriptionMode subscriptionMode) {
conf.setSubscriptionMode(subscriptionMode);
return this;
}
@Override
public ConsumerBuilder<T> messageListener(@NonNull MessageListener<T> messageListener) {
conf.setMessageListener(messageListener);
......
......@@ -63,6 +63,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
......@@ -150,39 +151,38 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
private final boolean createTopicIfDoesNotExist;
enum SubscriptionMode {
// Make the subscription to be backed by a durable cursor that will retain messages and persist the current
// position
Durable,
// Lightweight subscription mode that doesn't have a durable cursor associated
NonDurable
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor,
int partitionIndex,
boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture,
subscriptionMode, startMessageId, schema, interceptors,
startMessageId, schema, interceptors,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
subscribeFuture, subscriptionMode, startMessageId, 0 /* rollback time in sec to start msgId */,
subscribeFuture, startMessageId, 0 /* rollback time in sec to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture, SubscriptionMode subscriptionMode, MessageId startMessageId,
CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
this.lastDequeuedMessage = startMessageId == null ? MessageId.earliest : startMessageId;
this.initialStartMessageId = this.startMessageId;
......
......@@ -59,7 +59,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
......@@ -833,10 +832,10 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
SubscriptionMode.Durable, null, schema, interceptors,
createIfDoesNotExist);
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
null, schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
......@@ -847,7 +846,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, null,
schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
......@@ -1118,7 +1117,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
partitionIndex, true, subFuture, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
......
......@@ -64,7 +64,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
......@@ -354,7 +353,7 @@ public class PulsarClientImpl implements PulsarClient {
} else {
int partitionIndex = TopicName.getPartitionIndex(topic);
consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
consumerSubscribedFuture,null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
}
......
......@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
......@@ -47,6 +46,7 @@ public class ReaderImpl<T> implements Reader<T> {
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
......@@ -83,7 +83,7 @@ public class ReaderImpl<T> implements Reader<T> {
final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration,
listenerExecutor, partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable,
listenerExecutor, partitionIdx, false, consumerFuture,
readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(),
schema, null, true /* createTopicIfDoesNotExist */);
}
......
......@@ -49,11 +49,11 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
MessageId startMessageId, Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
subscriptionMode, startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors,
createTopicIfDoesNotExist);
}
......
......@@ -43,6 +43,7 @@ import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
@Data
......@@ -59,6 +60,8 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
@JsonIgnore
private MessageListener<T> messageListener;
......
......@@ -18,25 +18,32 @@
*/
package org.apache.pulsar.client.impl;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.mockito.Mockito.*;
public class ConsumerImplTest {
......@@ -62,7 +69,7 @@ public class ConsumerImplTest {
consumerConf.setSubscriptionName("test-sub");
consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
executorService, -1, false, subscribeFuture, null, null, null,
true);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册