properties = new HashMap<>();
-
- private boolean readCompacted = false;
+ private final ConsumerConfigurationData conf = new ConsumerConfigurationData();
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
public long getAckTimeoutMillis() {
- return ackTimeoutMillis;
+ return conf.getAckTimeoutMillis();
}
/**
@@ -91,7 +67,7 @@ public class ConsumerConfiguration implements Serializable {
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
- this.ackTimeoutMillis = ackTimeoutMillis;
+ conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
@@ -99,7 +75,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the configured subscription type
*/
public SubscriptionType getSubscriptionType() {
- return this.subscriptionType;
+ return conf.getSubscriptionType();
}
/**
@@ -112,7 +88,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
checkNotNull(subscriptionType);
- this.subscriptionType = subscriptionType;
+ conf.setSubscriptionType(subscriptionType);
return this;
}
@@ -120,7 +96,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the configured {@link MessageListener} for the consumer
*/
public MessageListener getMessageListener() {
- return this.messageListener;
+ return conf.getMessageListener();
}
/**
@@ -134,7 +110,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
checkNotNull(messageListener);
- this.messageListener = messageListener;
+ conf.setMessageListener(messageListener);
return this;
}
@@ -144,24 +120,27 @@ public class ConsumerConfiguration implements Serializable {
* @since 2.0
*/
public ConsumerEventListener getConsumerEventListener() {
- return this.consumerEventListener;
+ return conf.getConsumerEventListener();
}
/**
* Sets a {@link ConsumerEventListener} for the consumer.
*
- * The consumer group listener is used for receiving consumer state change in a consumer group for failover
+ *
+ * The consumer group listener is used for receiving consumer state change in a consumer group for failover
* subscription. Application can then react to the consumer state changes.
*
- *
This change is experimental. It is subject to changes coming in release 2.0.
+ *
+ * This change is experimental. It is subject to changes coming in release 2.0.
*
- * @param listener the consumer group listener object
+ * @param listener
+ * the consumer group listener object
* @return consumer configuration
* @since 2.0
*/
public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
checkNotNull(listener);
- this.consumerEventListener = listener;
+ conf.setConsumerEventListener(listener);
return this;
}
@@ -169,15 +148,14 @@ public class ConsumerConfiguration implements Serializable {
* @return the configure receiver queue size value
*/
public int getReceiverQueueSize() {
- return this.receiverQueueSize;
+ return conf.getReceiverQueueSize();
}
-
/**
* @return the configured max total receiver queue size across partitions
*/
public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
- return maxTotalReceiverQueueSizeAcrossPartitions;
+ return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
}
/**
@@ -189,15 +167,15 @@ public class ConsumerConfiguration implements Serializable {
* @param maxTotalReceiverQueueSizeAcrossPartitions
*/
public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
- checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
- this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
+ checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
+ conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
}
/**
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
- return this.cryptoKeyReader;
+ return conf.getCryptoKeyReader();
}
/**
@@ -208,24 +186,25 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
- this.cryptoKeyReader = cryptoKeyReader;
+ conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
/**
* Sets the ConsumerCryptoFailureAction to the value specified
*
- * @param The consumer action
+ * @param The
+ * consumer action
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
- cryptoFailureAction = action;
+ conf.setCryptoFailureAction(action);
}
/**
* @return The ConsumerCryptoFailureAction
*/
public ConsumerCryptoFailureAction getCryptoFailureAction() {
- return this.cryptoFailureAction;
+ return conf.getCryptoFailureAction();
}
/**
@@ -256,7 +235,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
- this.receiverQueueSize = receiverQueueSize;
+ conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
@@ -264,7 +243,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the consumer name
*/
public String getConsumerName() {
- return consumerName;
+ return conf.getConsumerName();
}
/**
@@ -274,12 +253,12 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setConsumerName(String consumerName) {
checkArgument(consumerName != null && !consumerName.equals(""));
- this.consumerName = consumerName;
+ conf.setConsumerName(consumerName);
return this;
}
public int getPriorityLevel() {
- return priorityLevel;
+ return conf.getPriorityLevel();
}
/**
@@ -303,32 +282,34 @@ public class ConsumerConfiguration implements Serializable {
* @param priorityLevel
*/
public void setPriorityLevel(int priorityLevel) {
- this.priorityLevel = priorityLevel;
+ conf.setPriorityLevel(priorityLevel);
}
public boolean getReadCompacted() {
- return readCompacted;
+ return conf.isReadCompacted();
}
/**
- * If enabled, the consumer will read messages from the compacted topic rather than reading the full message
- * backlog of the topic. This means that, if the topic has been compacted, the consumer will only see the latest
- * value for each key in the topic, up until the point in the topic message backlog that has been compacted.
- * Beyond that point, the messages will be sent as normal.
+ * If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
+ * of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
+ * each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
+ * point, the messages will be sent as normal.
*
- * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer
- * (i.e. failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent
- * topics or on a shared subscription, will lead to the subscription call throwing a PulsarClientException.
+ * readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
+ * failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
+ * shared subscription, will lead to the subscription call throwing a PulsarClientException.
*
- * @param readCompacted whether to read from the compacted topic
+ * @param readCompacted
+ * whether to read from the compacted topic
*/
public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
- this.readCompacted = readCompacted;
+ conf.setReadCompacted(readCompacted);
return this;
}
/**
* Set a name/value property with this consumer.
+ *
* @param key
* @param value
* @return
@@ -336,23 +317,26 @@ public class ConsumerConfiguration implements Serializable {
public ConsumerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
- properties.put(key, value);
+ conf.getProperties().put(key, value);
return this;
}
/**
* Add all the properties in the provided map
+ *
* @param properties
* @return
*/
public ConsumerConfiguration setProperties(Map properties) {
- if (properties != null) {
- this.properties.putAll(properties);
- }
+ conf.getProperties().putAll(properties);
return this;
}
public Map getProperties() {
- return properties;
+ return conf.getProperties();
+ }
+
+ public ConsumerConfigurationData getConfigurationData() {
+ return conf;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
new file mode 100644
index 0000000000000000000000000000000000000000..a451c7e93b7044a1146754355ce975c44345afba
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/HashingScheme.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Standard hashing functions available when choosing the partition to use for a particular message.
+ */
+public enum HashingScheme {
+
+ /**
+ * Use regural String.hashCode()
+ */
+ JavaStringHash,
+
+ /**
+ * Use Murmur3 hashing function.
+ * https://en.wikipedia.org/wiki/MurmurHash
+ */
+ Murmur3_32Hash
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
index 1d45489b91e6bfa85275ede6fbe77d093c1aaf5a..97231296edc7dbc8bca7258e3e1518a68a0380d2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageRoutingMode.java
@@ -18,6 +18,25 @@
*/
package org.apache.pulsar.client.api;
+/**
+ * Default routing mode for messages to partition.
+ *
+ * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a particular
+ * message.
+ */
public enum MessageRoutingMode {
- SinglePartition, RoundRobinPartition, CustomPartition
+ /**
+ * The producer will chose one single partition and publish all the messages into that partition.
+ */
+ SinglePartition,
+
+ /**
+ * Publish messages across all partitions in round-robin.
+ */
+ RoundRobinPartition,
+
+ /**
+ * Use custom message router implemenation that will be called to determine the partition for a particular message.
+ */
+ CustomPartition
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 5ab1215dc123f1061f49dbe31a51440a1c166744..3961f8c64d1a8eaf853fc2e47da55c379a6c034d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -144,12 +144,34 @@ public interface ProducerBuilder extends Serializable, Cloneable {
ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull);
/**
- * Set the message routing mode for the partitioned producer
+ * Set the message routing mode for the partitioned producer.
*
- * @param mode
- * @return
+ * Default routing mode for messages to partition.
+ *
+ * This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a
+ * particular message.
+ *
+ * @param messageRoutingMode
+ * the message routing mode
+ */
+ ProducerBuilder messageRoutingMode(MessageRoutingMode messageRoutingMode);
+
+ /**
+ * Change the {@link HashingScheme} used to chose the partition on where to publish a particular message.
+ *
+ * Standard hashing functions available are:
+ *
+ *
+ * Default is JavaStringHash
.
+ *
+ * @param hashingScheme
+ * the chosen {@link HashingScheme}
*/
- ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode);
+ ProducerBuilder hashingScheme(HashingScheme hashingScheme);
/**
* Set the compression type for the producer.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
index fb098cf92c42f6ac30da6ebeb8da49c3534c8e1e..edd312139865af8356f2f1ee64d881b47b32e4cd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ProducerConfiguration.java
@@ -21,17 +21,16 @@ package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
-import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
-import com.google.common.base.Objects;
+import lombok.EqualsAndHashCode;
/**
* Producer's configuration
@@ -39,51 +38,27 @@ import com.google.common.base.Objects;
* @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
*/
@Deprecated
+@EqualsAndHashCode
public class ProducerConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
- private String producerName = null;
- private long sendTimeoutMs = 30000;
- private boolean blockIfQueueFull = false;
- private int maxPendingMessages = 1000;
- private int maxPendingMessagesAcrossPartitions = 50000;
- private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
- private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
- @JsonIgnore
- private MessageRouter customMessageRouter = null;
- private long batchingMaxPublishDelayMs = 10;
- private int batchingMaxMessages = 1000;
- private boolean batchingEnabled = false; // disabled by default
-
- @JsonIgnore
- private CryptoKeyReader cryptoKeyReader;
- @JsonIgnore
- private ConcurrentOpenHashSet encryptionKeys;
-
- private CompressionType compressionType = CompressionType.NONE;
-
- // Cannot use Optional since it's not serializable
- private Long initialSequenceId = null;
-
- private final Map properties = new HashMap<>();
+
+ private final ProducerConfigurationData conf = new ProducerConfigurationData();
public enum MessageRoutingMode {
SinglePartition, RoundRobinPartition, CustomPartition
}
public enum HashingScheme {
- JavaStringHash,
- Murmur3_32Hash
+ JavaStringHash, Murmur3_32Hash
}
- private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
-
/**
* @return the configured custom producer name or null if no custom name was specified
* @since 1.20.0
*/
public String getProducerName() {
- return producerName;
+ return conf.getProducerName();
}
/**
@@ -103,14 +78,14 @@ public class ProducerConfiguration implements Serializable {
* @since 1.20.0
*/
public void setProducerName(String producerName) {
- this.producerName = producerName;
+ conf.setProducerName(producerName);
}
/**
* @return the message send timeout in ms
*/
public long getSendTimeoutMs() {
- return sendTimeoutMs;
+ return conf.getSendTimeoutMs();
}
/**
@@ -125,7 +100,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
checkArgument(sendTimeout >= 0);
- this.sendTimeoutMs = unit.toMillis(sendTimeout);
+ conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
@@ -133,7 +108,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of messages allowed in the outstanding messages queue for the producer
*/
public int getMaxPendingMessages() {
- return maxPendingMessages;
+ return conf.getMaxPendingMessages();
}
/**
@@ -147,16 +122,16 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
checkArgument(maxPendingMessages > 0);
- this.maxPendingMessages = maxPendingMessages;
+ conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
public HashingScheme getHashingScheme() {
- return hashingScheme;
+ return HashingScheme.valueOf(conf.getHashingScheme().toString());
}
public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
- this.hashingScheme = hashingScheme;
+ conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
return this;
}
@@ -165,7 +140,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of pending messages allowed across all the partitions
*/
public int getMaxPendingMessagesAcrossPartitions() {
- return maxPendingMessagesAcrossPartitions;
+ return conf.getMaxPendingMessagesAcrossPartitions();
}
/**
@@ -177,8 +152,8 @@ public class ProducerConfiguration implements Serializable {
* @param maxPendingMessagesAcrossPartitions
*/
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
- checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
- this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
+ checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
+ conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
}
/**
@@ -187,7 +162,7 @@ public class ProducerConfiguration implements Serializable {
* pending queue is full
*/
public boolean getBlockIfQueueFull() {
- return blockIfQueueFull;
+ return conf.isBlockIfQueueFull();
}
/**
@@ -202,7 +177,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
- this.blockIfQueueFull = blockIfQueueFull;
+ conf.setBlockIfQueueFull(blockIfQueueFull);
return this;
}
@@ -214,7 +189,8 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
checkNotNull(messageRouteMode);
- this.messageRouteMode = messageRouteMode;
+ conf.setMessageRoutingMode(
+ org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
return this;
}
@@ -224,7 +200,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public MessageRoutingMode getMessageRoutingMode() {
- return messageRouteMode;
+ return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
}
/**
@@ -244,7 +220,7 @@ public class ProducerConfiguration implements Serializable {
* compress messages.
*/
public ProducerConfiguration setCompressionType(CompressionType compressionType) {
- this.compressionType = compressionType;
+ conf.setCompressionType(compressionType);
return this;
}
@@ -252,7 +228,7 @@ public class ProducerConfiguration implements Serializable {
* @return the configured compression type for this producer
*/
public CompressionType getCompressionType() {
- return compressionType;
+ return conf.getCompressionType();
}
/**
@@ -264,7 +240,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
checkNotNull(messageRouter);
setMessageRoutingMode(MessageRoutingMode.CustomPartition);
- customMessageRouter = messageRouter;
+ conf.setCustomMessageRouter(messageRouter);
return this;
}
@@ -278,7 +254,7 @@ public class ProducerConfiguration implements Serializable {
*/
@Deprecated
public MessageRouter getMessageRouter(int numPartitions) {
- return customMessageRouter;
+ return conf.getCustomMessageRouter();
}
/**
@@ -287,7 +263,7 @@ public class ProducerConfiguration implements Serializable {
* @return message router set by {@link #setMessageRouter(MessageRouter)}.
*/
public MessageRouter getMessageRouter() {
- return customMessageRouter;
+ return conf.getCustomMessageRouter();
}
/**
@@ -295,7 +271,7 @@ public class ProducerConfiguration implements Serializable {
*/
public boolean getBatchingEnabled() {
- return batchingEnabled;
+ return conf.isBatchingEnabled();
}
/**
@@ -315,7 +291,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
- this.batchingEnabled = batchMessagesEnabled;
+ conf.setBatchingEnabled(batchMessagesEnabled);
return this;
}
@@ -323,7 +299,7 @@ public class ProducerConfiguration implements Serializable {
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
- return this.cryptoKeyReader;
+ return conf.getCryptoKeyReader();
}
/**
@@ -334,7 +310,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
- this.cryptoKeyReader = cryptoKeyReader;
+ conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
@@ -343,8 +319,8 @@ public class ProducerConfiguration implements Serializable {
* @return encryptionKeys
*
*/
- public ConcurrentOpenHashSet getEncryptionKeys() {
- return this.encryptionKeys;
+ public Set getEncryptionKeys() {
+ return conf.getEncryptionKeys();
}
/**
@@ -353,7 +329,7 @@ public class ProducerConfiguration implements Serializable {
*
*/
public boolean isEncryptionEnabled() {
- return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
+ return conf.isEncryptionEnabled();
}
/**
@@ -366,16 +342,11 @@ public class ProducerConfiguration implements Serializable {
*
*/
public void addEncryptionKey(String key) {
- if (this.encryptionKeys == null) {
- this.encryptionKeys = new ConcurrentOpenHashSet(16, 1);
- }
- this.encryptionKeys.add(key);
+ conf.getEncryptionKeys().add(key);
}
public void removeEncryptionKey(String key) {
- if (this.encryptionKeys != null) {
- this.encryptionKeys.remove(key);
- }
+ conf.getEncryptionKeys().remove(key);
}
/**
@@ -385,14 +356,14 @@ public class ProducerConfiguration implements Serializable {
* The producer action
*/
public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
- cryptoFailureAction = action;
+ conf.setCryptoFailureAction(action);
}
/**
* @return The ProducerCryptoFailureAction
*/
public ProducerCryptoFailureAction getCryptoFailureAction() {
- return this.cryptoFailureAction;
+ return conf.getCryptoFailureAction();
}
/**
@@ -401,7 +372,7 @@ public class ProducerConfiguration implements Serializable {
* @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
*/
public long getBatchingMaxPublishDelayMs() {
- return batchingMaxPublishDelayMs;
+ return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
}
/**
@@ -423,7 +394,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
long delayInMs = timeUnit.toMillis(batchDelay);
checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
- this.batchingMaxPublishDelayMs = delayInMs;
+ conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
return this;
}
@@ -432,7 +403,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of messages permitted in a batch.
*/
public int getBatchingMaxMessages() {
- return batchingMaxMessages;
+ return conf.getBatchingMaxMessages();
}
/**
@@ -448,12 +419,12 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
checkArgument(batchMessagesMaxMessagesPerBatch > 0);
- this.batchingMaxMessages = batchMessagesMaxMessagesPerBatch;
+ conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
return this;
}
public Optional getInitialSequenceId() {
- return initialSequenceId != null ? Optional.of(initialSequenceId) : Optional.empty();
+ return Optional.ofNullable(conf.getInitialSequenceId());
}
/**
@@ -466,7 +437,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
- this.initialSequenceId = initialSequenceId;
+ conf.setInitialSequenceId(initialSequenceId);
return this;
}
@@ -480,7 +451,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
- properties.put(key, value);
+ conf.getProperties().put(key, value);
return this;
}
@@ -491,26 +462,15 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setProperties(Map properties) {
- if (properties != null) {
- this.properties.putAll(properties);
- }
+ conf.getProperties().putAll(properties);
return this;
}
public Map getProperties() {
- return properties;
+ return conf.getProperties();
}
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof ProducerConfiguration) {
- ProducerConfiguration other = (ProducerConfiguration) obj;
- return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs)
- && Objects.equal(maxPendingMessages, other.maxPendingMessages)
- && Objects.equal(this.messageRouteMode, other.messageRouteMode)
- && Objects.equal(this.hashingScheme, other.hashingScheme);
- }
-
- return false;
+ public ProducerConfigurationData getProducerConfigurationData() {
+ return conf;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
index 6f3816c45597b06d550e2d5190f332e639209322..cdd51a5147cd8dc2f56455d4314e908d3b4b25be 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ReaderConfiguration.java
@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
+
/**
*
* @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
@@ -30,20 +32,13 @@ import java.io.Serializable;
@Deprecated
public class ReaderConfiguration implements Serializable {
- private int receiverQueueSize = 1000;
-
- private ReaderListener readerListener;
-
- private String readerName = null;
-
- private CryptoKeyReader cryptoKeyReader = null;
- private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+ private final ReaderConfigurationData conf = new ReaderConfigurationData();
/**
* @return the configured {@link ReaderListener} for the reader
*/
public ReaderListener getReaderListener() {
- return this.readerListener;
+ return conf.getReaderListener();
}
/**
@@ -57,7 +52,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
checkNotNull(readerListener);
- this.readerListener = readerListener;
+ conf.setReaderListener(readerListener);
return this;
}
@@ -65,14 +60,14 @@ public class ReaderConfiguration implements Serializable {
* @return the configure receiver queue size value
*/
public int getReceiverQueueSize() {
- return this.receiverQueueSize;
+ return conf.getReceiverQueueSize();
}
/**
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
- return this.cryptoKeyReader;
+ return conf.getCryptoKeyReader();
}
/**
@@ -83,7 +78,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
- this.cryptoKeyReader = cryptoKeyReader;
+ conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
@@ -94,14 +89,14 @@ public class ReaderConfiguration implements Serializable {
* The action to take when the decoding fails
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
- cryptoFailureAction = action;
+ conf.setCryptoFailureAction(action);
}
/**
* @return The ConsumerCryptoFailureAction
*/
public ConsumerCryptoFailureAction getCryptoFailureAction() {
- return this.cryptoFailureAction;
+ return conf.getCryptoFailureAction();
}
/**
@@ -118,7 +113,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
- this.receiverQueueSize = receiverQueueSize;
+ conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
@@ -126,7 +121,7 @@ public class ReaderConfiguration implements Serializable {
* @return the consumer name
*/
public String getReaderName() {
- return readerName;
+ return conf.getReaderName();
}
/**
@@ -136,9 +131,13 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReaderName(String readerName) {
checkArgument(readerName != null && !readerName.equals(""));
- this.readerName = readerName;
+ conf.setReaderName(readerName);
return this;
}
+ public ReaderConfigurationData getReaderConfigurationData() {
+ return conf;
+ }
+
private static final long serialVersionUID = 1L;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 8ad14ccb4c837ba2e4a234efca623b24e97d2427..6c87f5645b764a92d57b76e81516c477f62ed44c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
-import io.netty.resolver.InetSocketAddressResolver;
public class BinaryProtoLookupService implements LookupService {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index 2be5318e08a4b0f6d26920b4ae2ffd2c86d3f5aa..4cffb7fa8a247ea923863e9d8959c9cfcbf459e5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -22,41 +22,44 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
-@SuppressWarnings("deprecation")
public class ClientBuilderImpl implements ClientBuilder {
private static final long serialVersionUID = 1L;
- String serviceUrl;
- final ClientConfiguration conf = new ClientConfiguration();
+ final ClientConfigurationData conf;
+
+ public ClientBuilderImpl() {
+ this(new ClientConfigurationData());
+ }
+
+ private ClientBuilderImpl(ClientConfigurationData conf) {
+ this.conf = conf;
+ }
@Override
public PulsarClient build() throws PulsarClientException {
- if (serviceUrl == null) {
+ if (conf.getServiceUrl() == null) {
throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object");
}
- return new PulsarClientImpl(serviceUrl, conf);
+ return new PulsarClientImpl(conf);
}
@Override
public ClientBuilder clone() {
- try {
- return (ClientBuilder) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Failed to clone ClientBuilderImpl");
- }
+ return new ClientBuilderImpl(conf.clone());
}
@Override
public ClientBuilder serviceUrl(String serviceUrl) {
- this.serviceUrl = serviceUrl;
+ conf.setServiceUrl(serviceUrl);
return this;
}
@@ -69,32 +72,32 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
- conf.setAuthentication(authPluginClassName, authParamsString);
+ conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
return this;
}
@Override
public ClientBuilder authentication(String authPluginClassName, Map authParams)
throws UnsupportedAuthenticationException {
- conf.setAuthentication(authPluginClassName, authParams);
+ conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
return this;
}
@Override
public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
- conf.setOperationTimeout(operationTimeout, unit);
+ conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
return this;
}
@Override
public ClientBuilder ioThreads(int numIoThreads) {
- conf.setIoThreads(numIoThreads);
+ conf.setNumIoThreads(numIoThreads);
return this;
}
@Override
public ClientBuilder listenerThreads(int numListenerThreads) {
- conf.setListenerThreads(numListenerThreads);
+ conf.setNumListenerThreads(numListenerThreads);
return this;
}
@@ -136,7 +139,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
- conf.setStatsInterval(statsInterval, unit);
+ conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 43c49b2a9f6307c08dcf495614605a5643416821..7505addb021270b1353be09d00ad017ff6f3b5d6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -33,11 +33,12 @@ import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
@@ -57,7 +58,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
-import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,7 +106,7 @@ public class ClientCnx extends PulsarHandler {
None, SentConnectFrame, Ready, Failed
}
- public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
+ public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
super(30, TimeUnit.SECONDS);
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
this.authentication = conf.getAuthentication();
@@ -196,14 +196,14 @@ public class ClientCnx extends PulsarHandler {
@Override
protected void handleConnected(CommandConnected connected) {
-
+
if (isTlsHostnameVerificationEnable && remoteHostName != null && !verifyTlsHostName(remoteHostName, ctx)) {
// close the connection if host-verification failed with the broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}
-
+
checkArgument(state == State.SentConnectFrame);
if (log.isDebugEnabled()) {
@@ -593,9 +593,9 @@ public class ClientCnx extends PulsarHandler {
/**
* verifies host name provided in x509 Certificate in tls session
- *
+ *
* it matches hostname with below scenarios
- *
+ *
*
* 1. Supports IPV4 and IPV6 host matching
* 2. Supports wild card matching for DNS-name
@@ -605,7 +605,7 @@ public class ClientCnx extends PulsarHandler {
* 2. localhost local* PASS
* 3. pulsar1-broker.com pulsar*.com PASS
*
- *
+ *
* @param ctx
* @return true if hostname is verified else return false
*/
@@ -648,7 +648,7 @@ public class ClientCnx extends PulsarHandler {
void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}
-
+
private PulsarClientException getPulsarClientException(ServerError error, String errorMsg) {
switch (error) {
case AuthenticationError:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index c36fcd979739bf4cf14dcb9b7557f635f6c07c95..84edec123e9fccb99c5f135ba0fb78ff002e2316 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -31,8 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
@@ -68,7 +68,7 @@ public class ConnectionPool implements Closeable {
private static final int MaxMessageSize = 5 * 1024 * 1024;
public static final String TLS_HANDLER = "tls";
- public ConnectionPool(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
+ public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index ef7ab8b82c2b50c94f69a8c122920261c4a0cae7..842fbfb47c281626e827bff062dbb56f2a24a930 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
-import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
@@ -28,20 +27,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
+import com.google.common.collect.Queues;
+
public abstract class ConsumerBase extends HandlerBase implements Consumer {
enum ConsumerType {
@@ -49,7 +51,7 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
}
protected final String subscription;
- protected final ConsumerConfiguration conf;
+ protected final ConsumerConfigurationData conf;
protected final String consumerName;
protected final CompletableFuture subscribeFuture;
protected final MessageListener listener;
@@ -59,11 +61,11 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
protected final ConcurrentLinkedQueue> pendingReceives;
protected int maxReceiverQueueSize;
- protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
- int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture subscribeFuture) {
- super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS));
+ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize,
+ ExecutorService listenerExecutor, CompletableFuture subscribeFuture) {
+ super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
this.maxReceiverQueueSize = receiverQueueSize;
- this.subscription = subscription;
+ this.subscription = conf.getSubscriptionName();
this.conf = conf;
this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
@@ -93,9 +95,13 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
+ case Terminated:
+ throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
case Failed:
case Uninitialized:
throw new PulsarClientException.NotConnectedException();
+ default:
+ break;
}
return internalReceive();
@@ -116,6 +122,8 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
+ case Terminated:
+ return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Topic was terminated"));
case Failed:
case Uninitialized:
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
@@ -146,6 +154,8 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
+ case Terminated:
+ throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
case Failed:
case Uninitialized:
throw new PulsarClientException.NotConnectedException();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 66cd2912ca63a1284d7975338c6e9b45a788fe04..6cac9b17741a07ab88de01c37fb58151574cfdc1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -20,48 +20,45 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
+import com.google.common.collect.Lists;
-@SuppressWarnings("deprecation")
public class ConsumerBuilderImpl implements ConsumerBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
- private String subscriptionName;
- private final ConsumerConfiguration conf;
- private Set topicNames;
+ private final ConsumerConfigurationData conf;
+
+ private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
ConsumerBuilderImpl(PulsarClientImpl client) {
+ this(client, new ConsumerConfigurationData());
+ }
+
+ private ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData conf) {
this.client = client;
- this.conf = new ConsumerConfiguration();
+ this.conf = conf;
}
@Override
public ConsumerBuilder clone() {
- try {
- return (ConsumerBuilder) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Failed to clone ConsumerBuilderImpl");
- }
+ return new ConsumerBuilderImpl(client, conf.clone());
}
@Override
@@ -83,55 +80,45 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
@Override
public CompletableFuture subscribeAsync() {
- if (topicNames == null || topicNames.isEmpty()) {
+ if (conf.getTopicNames().isEmpty()) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder"));
}
- if (subscriptionName == null) {
+ if (conf.getSubscriptionName() == null) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Subscription name must be set on the consumer builder"));
}
- if (topicNames.size() == 1) {
- return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf);
- } else {
- return client.subscribeAsync(topicNames, subscriptionName, conf);
- }
+ return client.subscribeAsync(conf);
}
@Override
public ConsumerBuilder topic(String... topicNames) {
checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
- if (this.topicNames == null) {
- this.topicNames = Sets.newHashSet(topicNames);
- } else {
- this.topicNames.addAll(Lists.newArrayList(topicNames));
- }
+ conf.getTopicNames().addAll(Lists.newArrayList(topicNames));
return this;
}
@Override
public ConsumerBuilder topics(List topicNames) {
- checkArgument(topicNames != null && !topicNames.isEmpty(),
- "Passed in topicNames list should not be empty.");
- if (this.topicNames == null) {
- this.topicNames = Sets.newHashSet();
- }
- this.topicNames.addAll(topicNames);
+ checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
+ conf.getTopicNames().addAll(topicNames);
return this;
}
@Override
public ConsumerBuilder subscriptionName(String subscriptionName) {
- this.subscriptionName = subscriptionName;
+ conf.setSubscriptionName(subscriptionName);
return this;
}
@Override
public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) {
- conf.setAckTimeout(ackTimeout, timeUnit);
+ checkArgument(timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
+ "Ack timeout should be should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
+ conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
@@ -179,13 +166,13 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
@Override
public ConsumerBuilder property(String key, String value) {
- conf.setProperty(key, value);
+ conf.getProperties().put(key, value);
return this;
}
@Override
public ConsumerBuilder properties(Map properties) {
- conf.setProperties(properties);
+ conf.getProperties().putAll(properties);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 93d0d8715d40479e1623ca2030a639fcc1b80ba5..e9c781764bb8646f781f79605373d158866f1723 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,11 +25,6 @@ import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
-import com.google.common.collect.Iterables;
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
@@ -52,13 +47,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
@@ -74,6 +70,13 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
public class ConsumerImpl extends ConsumerBase {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
@@ -83,6 +86,7 @@ public class ConsumerImpl extends ConsumerBase {
// broker to notify that we are ready to get (and store in the incoming messages queue) more messages
private static final AtomicIntegerFieldUpdater AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ConsumerImpl.class, "availablePermits");
+ @SuppressWarnings("unused")
private volatile int availablePermits = 0;
private MessageId lastDequeuedMessage = MessageId.earliest;
@@ -125,16 +129,15 @@ public class ConsumerImpl extends ConsumerBase {
NonDurable
}
- ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
+ ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture) {
- this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture,
- SubscriptionMode.Durable, null);
+ this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null);
}
- ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
+ ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId) {
- super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
+ super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
@@ -145,7 +148,7 @@ public class ConsumerImpl extends ConsumerBase {
this.codecProvider = new CompressionCodecProvider();
this.priorityLevel = conf.getPriorityLevel();
this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
- this.readCompacted = conf.getReadCompacted();
+ this.readCompacted = conf.isReadCompacted();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStats(client, conf, this);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
index efa8dbb7c4aa8118047c785d18d7ed16e1d4051b..3ef26c831d1dd1635488d52517d1e2156e8c65af 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStats.java
@@ -24,8 +24,8 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +74,7 @@ public class ConsumerStats implements Serializable {
throughputFormat = null;
}
- public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfiguration conf, ConsumerImpl consumer) {
+ public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
@@ -92,7 +92,7 @@ public class ConsumerStats implements Serializable {
init(conf);
}
- private void init(ConsumerConfiguration conf) {
+ private void init(ConsumerConfigurationData conf) {
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 7ad6e1a9ddd1c8160d58d297b8afaff894922d87..eb672624dc30b04bb54b1e179cc78d1202739d2a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -28,25 +28,31 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
-import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.ssl.SslContext;
-
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
-import org.asynchttpclient.*;
+import org.asynchttpclient.AsyncCompletionHandler;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.BoundRequestBuilder;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.asynchttpclient.ListenableFuture;
+import org.asynchttpclient.Request;
+import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.MoreExecutors;
+import io.netty.channel.EventLoopGroup;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.ssl.SslContext;
+
public class HttpClient implements Closeable {
protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
@@ -131,7 +137,7 @@ public class HttpClient implements Closeable {
builder.setHeader(header.getKey(), header.getValue());
}
}
-
+
final ListenableFuture responseFuture = builder.setHeader("Accept", "application/json")
.execute(new AsyncCompletionHandler() {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 208a9b119b8d6781727ed0b25bfdafeb7915b3e2..14723795f254d55688283f6d055da542d1923b72 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -23,8 +23,8 @@ import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -40,9 +40,9 @@ class HttpLookupService implements LookupService {
private final boolean useTls;
private static final String BasePath = "lookup/v2/destination/";
- public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
+ public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
- this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(),
+ this.httpClient = new HttpClient(conf.getServiceUrl(), conf.getAuthentication(),
eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
this.useTls = conf.isUseTls();
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
index 38d6a58506b939d7163cef42bfedb9d54bb6cd45..e7115fc9af4d939ba31ec259ded0214acbcc0c29 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageCrypto.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -57,7 +58,6 @@ import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
@@ -283,23 +283,21 @@ public class MessageCrypto {
* Encrypt data key using the public key(s) in the argument. If more than one key name is specified, data key is
* encrypted using each of those keys. If the public key is expired or changed, application is responsible to remove
* the old key and add the new key
- *
+ *
* @param keyNames List of public keys to encrypt data key
- *
+ *
* @param keyReader Implementation to read the key values
- *
+ *
*/
- public synchronized void addPublicKeyCipher(ConcurrentOpenHashSet keyNames, CryptoKeyReader keyReader)
+ public synchronized void addPublicKeyCipher(Set keyNames, CryptoKeyReader keyReader)
throws CryptoException {
// Generate data key
dataKey = keyGenerator.generateKey();
- List keyNameList = keyNames.values();
- for (int i = 0; i < keyNameList.size(); i++) {
- addPublicKeyCipher(keyNameList.get(i), keyReader);
+ for (String key : keyNames) {
+ addPublicKeyCipher(key, keyReader);
}
-
}
private void addPublicKeyCipher(String keyName, CryptoKeyReader keyReader) throws CryptoException {
@@ -350,9 +348,9 @@ public class MessageCrypto {
/*
* Remove a key Remove the key identified by the keyName from the list of keys.
- *
+ *
* @param keyName Unique name to identify the key
- *
+ *
* @return true if succeeded, false otherwise
*/
/*
@@ -368,16 +366,16 @@ public class MessageCrypto {
/*
* Encrypt the payload using the data key and update message metadata with the keyname & encrypted data key
- *
+ *
* @param encKeys One or more public keys to encrypt data key
- *
+ *
* @param msgMetadata Message Metadata
- *
+ *
* @param payload Message which needs to be encrypted
- *
+ *
* @return encryptedData if success
*/
- public synchronized ByteBuf encrypt(ConcurrentOpenHashSet encKeys, CryptoKeyReader keyReader,
+ public synchronized ByteBuf encrypt(Set encKeys, CryptoKeyReader keyReader,
MessageMetadata.Builder msgMetadata, ByteBuf payload) throws PulsarClientException {
if (encKeys.isEmpty()) {
@@ -385,9 +383,7 @@ public class MessageCrypto {
}
// Update message metadata with encrypted data key
- List keyNameList = encKeys.values();
- for (int i = 0; i < keyNameList.size(); i++) {
- String keyName = keyNameList.get(i);
+ for (String keyName : encKeys) {
if (encryptedDataKeyMap.get(keyName) == null) {
// Attempt to load the key. This will allow us to load keys as soon as
// a new key is added to producer config
@@ -569,13 +565,13 @@ public class MessageCrypto {
/*
* Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
- *
+ *
* @param msgMetadata Message Metadata
- *
+ *
* @param payload Message which needs to be decrypted
- *
+ *
* @param keyReader KeyReader implementation to retrieve key value
- *
+ *
* @return decryptedData if success, null otherwise
*/
public ByteBuf decrypt(MessageMetadata msgMetadata, ByteBuf payload, CryptoKeyReader keyReader) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
index 4825dabbbe5d362d4e219f566b077c7a17a3662b..1e4c036fdc77ad1fb47b6cb3140b2cc1d013c99a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRouterBase.java
@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.client.impl;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
-import org.apache.pulsar.client.api.ProducerConfiguration;
public abstract class MessageRouterBase implements MessageRouter {
+ private static final long serialVersionUID = 1L;
+
protected final Hash hash;
- MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) {
+ MessageRouterBase(HashingScheme hashingScheme) {
switch (hashingScheme) {
- case JavaStringHash:
- this.hash = JavaStringHash.getInstance();
- break;
- case Murmur3_32Hash:
- default:
- this.hash = Murmur3_32Hash.getInstance();
+ case JavaStringHash:
+ this.hash = JavaStringHash.getInstance();
+ break;
+ case Murmur3_32Hash:
+ default:
+ this.hash = Murmur3_32Hash.getInstance();
}
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 332dd5626f98f996245c96d7e772f812f397d95d..dff123387005dc0b739402b047d6d5643afba04e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -35,11 +35,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.FutureUtil;
@@ -65,10 +65,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
private final ConsumerStats stats;
private final UnAckedMessageTracker unAckedMessageTracker;
- PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
- int numPartitions, ExecutorService listenerExecutor, CompletableFuture subscribeFuture) {
- super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor,
- subscribeFuture);
+ PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, int numPartitions,
+ ExecutorService listenerExecutor, CompletableFuture subscribeFuture) {
+ super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()),
+ listenerExecutor, subscribeFuture);
this.consumers = Lists.newArrayListWithCapacity(numPartitions);
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
@@ -89,10 +89,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
private void start() {
AtomicReference subscribeFail = new AtomicReference();
AtomicInteger completed = new AtomicInteger();
- ConsumerConfiguration internalConfig = getInternalConsumerConfig();
+ ConsumerConfigurationData internalConfig = getInternalConsumerConfig();
for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) {
String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
- ConsumerImpl consumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
+ ConsumerImpl consumer = new ConsumerImpl(client, partitionName, internalConfig,
client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture());
consumers.add(consumer);
consumer.subscribeFuture().handle((cons, subscribeException) -> {
@@ -434,9 +434,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
return subscription;
}
- private ConsumerConfiguration getInternalConsumerConfig() {
- ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+ private ConsumerConfigurationData getInternalConsumerConfig() {
+ ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+ internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
if (null != conf.getConsumerEventListener()) {
@@ -451,7 +452,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
}
if (conf.getAckTimeoutMillis() != 0) {
- internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+ internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
}
return internalConsumerConfig;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 68fda50994cd7c41b789fba85a083bcc3e03f3c4..cf2aab16e11cf2dad5b4bd6bb7c92d76c72f60c1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
@@ -49,7 +49,7 @@ public class PartitionedProducerImpl extends ProducerBase {
private final ProducerStats stats;
private final TopicMetadata topicMetadata;
- public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions,
+ public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
CompletableFuture producerCreatedFuture) {
super(client, topic, conf, producerCreatedFuture);
this.producers = Lists.newArrayListWithCapacity(numPartitions);
@@ -67,7 +67,7 @@ public class PartitionedProducerImpl extends ProducerBase {
MessageRouter messageRouter;
MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
- MessageRouter customMessageRouter = conf.getMessageRouter();
+ MessageRouter customMessageRouter = conf.getCustomMessageRouter();
switch (messageRouteMode) {
case CustomPartition:
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 03680a1bc4f30980577359c12597e15d80b97bec..0876db1acc61937021f8470733fcc69edb926a4d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -26,17 +26,18 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
public abstract class ProducerBase extends HandlerBase implements Producer {
protected final CompletableFuture producerCreatedFuture;
- protected final ProducerConfiguration conf;
+ protected final ProducerConfigurationData conf;
- protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfiguration conf,
+ protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture producerCreatedFuture) {
- super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
+ super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS,
+ Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
this.producerCreatedFuture = producerCreatedFuture;
this.conf = conf;
}
@@ -98,7 +99,7 @@ public abstract class ProducerBase extends HandlerBase implements Producer {
return topic;
}
- public ProducerConfiguration getConfiguration() {
+ public ProducerConfigurationData getConfiguration() {
return conf;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
index 6bb9a9b595c87d3e1142816050d53a6a33f410e4..230d9c94425fb066d08fe8cacde0cc4eef358263 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBuilderImpl.java
@@ -25,36 +25,35 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
-@SuppressWarnings("deprecation")
public class ProducerBuilderImpl implements ProducerBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
- private String topicName;
- private final ProducerConfiguration conf;
+ private final ProducerConfigurationData conf;
ProducerBuilderImpl(PulsarClientImpl client) {
+ this(client, new ProducerConfigurationData());
+ }
+
+ private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData conf) {
this.client = client;
- this.conf = new ProducerConfiguration();
+ this.conf = conf;
}
@Override
public ProducerBuilder clone() {
- try {
- return (ProducerBuilder) super.clone();
- } catch (CloneNotSupportedException e) {
- throw new RuntimeException("Failed to clone ProducerBuilderImpl");
- }
+ return new ProducerBuilderImpl(client, conf.clone());
}
@Override
@@ -76,17 +75,17 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public CompletableFuture createAsync() {
- if (topicName == null) {
+ if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
}
- return client.createProducerAsync(topicName, conf);
+ return client.createProducerAsync(conf);
}
@Override
public ProducerBuilder topic(String topicName) {
- this.topicName = topicName;
+ conf.setTopicName(topicName);
return this;
}
@@ -98,7 +97,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) {
- conf.setSendTimeout(sendTimeout, unit);
+ conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
@@ -122,7 +121,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode) {
- conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString()));
+ conf.setMessageRoutingMode(messageRouteMode);
return this;
}
@@ -132,9 +131,15 @@ public class ProducerBuilderImpl implements ProducerBuilder {
return this;
}
+ @Override
+ public ProducerBuilder hashingScheme(HashingScheme hashingScheme) {
+ conf.setHashingScheme(hashingScheme);
+ return this;
+ }
+
@Override
public ProducerBuilder messageRouter(MessageRouter messageRouter) {
- conf.setMessageRouter(messageRouter);
+ conf.setCustomMessageRouter(messageRouter);
return this;
}
@@ -152,7 +157,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder addEncryptionKey(String key) {
- conf.addEncryptionKey(key);
+ conf.getEncryptionKeys().add(key);
return this;
}
@@ -164,7 +169,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
- conf.setBatchingMaxPublishDelay(batchDelay, timeUnit);
+ conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
return this;
}
@@ -182,13 +187,13 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder property(String key, String value) {
- conf.setProperty(key, value);
+ conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder properties(Map properties) {
- conf.setProperties(properties);
+ conf.getProperties().putAll(properties);
return this;
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 32d61a5325b6199866da4ed91202c000f9426db8..923a7e87e191cdf6e64302f9e2ccce504df9119b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
@@ -42,13 +41,13 @@ import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
-import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -107,7 +106,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
private static final AtomicLongFieldUpdater msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
- public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf,
+ public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture producerCreatedFuture, int partitionIndex) {
super(client, topic, conf, producerCreatedFuture);
this.producerId = client.newProducerId();
@@ -119,8 +118,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
this.compressor = CompressionCodecProvider
.getCompressionCodec(convertCompressionType(conf.getCompressionType()));
- if (conf.getInitialSequenceId().isPresent()) {
- long initialSequenceId = conf.getInitialSequenceId().get();
+ if (conf.getInitialSequenceId() != null) {
+ long initialSequenceId = conf.getInitialSequenceId();
this.lastSequenceIdPublished = initialSequenceId;
this.msgIdGenerator = initialSequenceId + 1;
} else {
@@ -151,7 +150,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
- if (conf.getBatchingEnabled()) {
+ if (conf.isBatchingEnabled()) {
this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch,
convertCompressionType(conf.getCompressionType()), topic, producerName);
@@ -175,7 +174,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
private boolean isBatchMessagingEnabled() {
- return conf.getBatchingEnabled();
+ return conf.isBatchingEnabled();
}
@Override
@@ -421,7 +420,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
private boolean canEnqueueRequest(SendCallback callback) {
try {
- if (conf.getBlockIfQueueFull()) {
+ if (conf.isBlockIfQueueFull()) {
semaphore.acquire();
} else {
if (!semaphore.tryAcquire()) {
@@ -843,15 +842,15 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
this.producerName = producerName;
}
- if (this.lastSequenceIdPublished == -1 && !conf.getInitialSequenceId().isPresent()) {
+ if (this.lastSequenceIdPublished == -1 && conf.getInitialSequenceId() == null) {
this.lastSequenceIdPublished = lastSequenceId;
this.msgIdGenerator = lastSequenceId + 1;
}
if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
- client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMs(),
- TimeUnit.MILLISECONDS);
+ client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
+ TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
@@ -1139,7 +1138,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
batchMessageAndSend();
}
// schedule the next batch message task
- client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
+ client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
};
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
index 682a357281b692d8f8ac60b51423dce55365dfef..6a5e321fffdf9ceb455947cc9e6c740054376edd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStats.java
@@ -24,7 +24,7 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
-import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,7 +74,7 @@ public class ProducerStats implements Serializable {
ds = null;
}
- public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfiguration conf, ProducerImpl producer) {
+ public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) {
this.pulsarClient = pulsarClient;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
this.producer = producer;
@@ -92,7 +92,7 @@ public class ProducerStats implements Serializable {
init(conf);
}
- private void init(ProducerConfiguration conf) {
+ private void init(ProducerConfigurationData conf) {
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c065532a16f3081fe726ed2e5a642f099145c636..d4c40d4780196f59a8ba1ef6bdfffa1fe1c2d797 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
-import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +44,10 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
@@ -67,7 +70,7 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
- private final ClientConfiguration conf;
+ private final ClientConfigurationData conf;
private final LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
@@ -87,37 +90,53 @@ public class PulsarClientImpl implements PulsarClient {
private final EventLoopGroup eventLoopGroup;
+ @Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
- this(serviceUrl, conf, getEventLoopGroup(conf));
+ this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
}
+ @Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
- this(serviceUrl, conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+ this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup);
}
+ @Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup,
ConnectionPool cnxPool) throws PulsarClientException {
- if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
+ this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup, cnxPool);
+ }
+
+ public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
+ this(conf, getEventLoopGroup(conf));
+ }
+
+ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
+ this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
+ }
+
+ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
+ throws PulsarClientException {
+ if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
this.conf = conf;
conf.getAuthentication().start();
this.cnxPool = cnxPool;
- if (serviceUrl.startsWith("http")) {
- lookup = new HttpLookupService(serviceUrl, conf, eventLoopGroup);
+ if (conf.getServiceUrl().startsWith("http")) {
+ lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
- lookup = new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls());
+ lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls());
}
timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
- externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
+ externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
state.set(State.Open);
}
- public ClientConfiguration getConfiguration() {
+ public ClientConfigurationData getConfiguration() {
return conf;
}
@@ -137,9 +156,11 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
- public Producer createProducer(String destination) throws PulsarClientException {
+ public Producer createProducer(String topic) throws PulsarClientException {
try {
- return createProducerAsync(destination, new ProducerConfiguration()).get();
+ ProducerConfigurationData conf = new ProducerConfigurationData();
+ conf.setTopicName(topic);
+ return createProducerAsync(conf).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
@@ -154,10 +175,15 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
- public Producer createProducer(final String destination, final ProducerConfiguration conf)
- throws PulsarClientException {
+ public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
+ if (conf == null) {
+ throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
+ }
+
try {
- return createProducerAsync(destination, conf).get();
+ ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+ confData.setTopicName(topic);
+ return createProducerAsync(confData).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
@@ -173,22 +199,33 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture createProducerAsync(String topic) {
- return createProducerAsync(topic, new ProducerConfiguration());
+ ProducerConfigurationData conf = new ProducerConfigurationData();
+ conf.setTopicName(topic);
+ return createProducerAsync(conf);
}
@Override
public CompletableFuture createProducerAsync(final String topic, final ProducerConfiguration conf) {
+ ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
+ confData.setTopicName(topic);
+ return createProducerAsync(confData);
+ }
+
+ public CompletableFuture createProducerAsync(ProducerConfigurationData conf) {
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
+ }
+
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
+ String topic = conf.getTopicName();
+
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
- if (conf == null) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
- }
CompletableFuture producerCreatedFuture = new CompletableFuture<>();
@@ -242,44 +279,70 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture subscribeAsync(String topic, String subscription) {
- return subscribeAsync(topic, subscription, new ConsumerConfiguration());
+ ConsumerConfigurationData conf = new ConsumerConfigurationData();
+ conf.getTopicNames().add(topic);
+ conf.setSubscriptionName(subscription);
+ return subscribeAsync(conf);
}
@Override
public CompletableFuture subscribeAsync(final String topic, final String subscription,
final ConsumerConfiguration conf) {
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
+ }
+
+ ConsumerConfigurationData confData = conf.getConfigurationData().clone();
+ confData.getTopicNames().add(topic);
+ confData.setSubscriptionName(subscription);
+ return subscribeAsync(confData);
+ }
+
+ public CompletableFuture subscribeAsync(ConsumerConfigurationData conf) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
- if (!DestinationName.isValid(topic)) {
+
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+ }
+
+ if (!conf.getTopicNames().stream().allMatch(topic -> DestinationName.isValid(topic))) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
- if (isBlank(subscription)) {
+
+ if (isBlank(conf.getSubscriptionName())) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
}
- if (conf == null) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
- }
- if (conf.getReadCompacted()
- && (!DestinationName.get(topic).getDomain().equals(DestinationDomain.persistent)
- || (conf.getSubscriptionType() != SubscriptionType.Exclusive
+
+ if (conf.isReadCompacted() && (!conf.getTopicNames().stream()
+ .allMatch(topic -> DestinationName.get(topic).getDomain() == DestinationDomain.persistent)
+ || (conf.getSubscriptionType() != SubscriptionType.Exclusive
&& conf.getSubscriptionType() != SubscriptionType.Failover))) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException(
- "Read compacted can only be used with exclusive of failover persistent subscriptions"));
+ return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
+ "Read compacted can only be used with exclusive of failover persistent subscriptions"));
}
- if (conf.getConsumerEventListener() != null
- && conf.getSubscriptionType() != SubscriptionType.Failover) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException(
- "Active consumer listener is only supported for failover subscription"));
+ if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover) {
+ return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
+ "Active consumer listener is only supported for failover subscription"));
}
+ if (conf.getTopicNames().size() == 1) {
+ return singleTopicSubscribeAsysnc(conf);
+ } else {
+ return multiTopicSubscribeAsync(conf);
+ }
+ }
+
+ private CompletableFuture singleTopicSubscribeAsysnc(ConsumerConfigurationData conf) {
CompletableFuture consumerSubscribedFuture = new CompletableFuture<>();
+ String topic = conf.getSingleTopic();
+
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
@@ -289,10 +352,10 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
- consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, topic, subscription, conf,
- metadata.partitions, listenerThread, consumerSubscribedFuture);
+ consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, conf, metadata.partitions, listenerThread,
+ consumerSubscribedFuture);
} else {
- consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, -1,
+ consumer = new ConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
consumerSubscribedFuture);
}
@@ -308,31 +371,12 @@ public class PulsarClientImpl implements PulsarClient {
return consumerSubscribedFuture;
}
- public CompletableFuture subscribeAsync(Collection topics,
- String subscription,
- ConsumerConfiguration conf) {
- if (topics == null || topics.isEmpty()) {
- return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Empty topics name"));
- }
-
- if (state.get() != State.Open) {
- return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
- }
-
- if (isBlank(subscription)) {
- return FutureUtil
- .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
- }
- if (conf == null) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
- }
-
+ private CompletableFuture multiTopicSubscribeAsync(ConsumerConfigurationData conf) {
CompletableFuture consumerSubscribedFuture = new CompletableFuture<>();
- ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, topics, subscription,
- conf, externalExecutorProvider.getExecutor(),
- consumerSubscribedFuture);
+ ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, conf,
+ externalExecutorProvider.getExecutor(), consumerSubscribedFuture);
+
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
}
@@ -361,20 +405,32 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture createReaderAsync(String topic, MessageId startMessageId,
ReaderConfiguration conf) {
+ ReaderConfigurationData confData = conf.getReaderConfigurationData().clone();
+ confData.setTopicName(topic);
+ confData.setStartMessageId(startMessageId);
+ return createReaderAsync(confData);
+ }
+
+ public CompletableFuture createReaderAsync(ReaderConfigurationData conf) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
+
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+ }
+
+ String topic = conf.getTopicName();
+
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
- if (startMessageId == null) {
+
+ if (conf.getStartMessageId() == null) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
}
- if (conf == null) {
- return FutureUtil.failedFuture(
- new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
- }
CompletableFuture readerFuture = new CompletableFuture<>();
@@ -392,8 +448,7 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture consumerSubscribedFuture = new CompletableFuture<>();
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
- ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, topic, startMessageId, conf, listenerThread,
- consumerSubscribedFuture);
+ ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture);
synchronized (consumers) {
consumers.put(reader.getConsumer(), Boolean.TRUE);
@@ -535,10 +590,9 @@ public class PulsarClientImpl implements PulsarClient {
return metadataFuture;
}
- private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) {
- int numThreads = conf.getIoThreads();
+ private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
- return EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
+ return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
}
void cleanupProducer(ProducerBase producer) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
index f3751340f6b2c1cbb78aad5ea66f4cf5e1c5b7ac..eb1aeb8c9761413a628e72bf0b5be03296936635 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java
@@ -27,24 +27,25 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
-@SuppressWarnings("deprecation")
public class ReaderBuilderImpl implements ReaderBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
- private final ReaderConfiguration conf;
- private String topicName;
- private MessageId startMessageId;
+ private final ReaderConfigurationData conf;
ReaderBuilderImpl(PulsarClientImpl client) {
+ this(client, new ReaderConfigurationData());
+ }
+
+ private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf) {
this.client = client;
- this.conf = new ReaderConfiguration();
+ this.conf = conf;
}
@Override
@@ -75,28 +76,28 @@ public class ReaderBuilderImpl implements ReaderBuilder {
@Override
public CompletableFuture createAsync() {
- if (topicName == null) {
+ if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}
- if (startMessageId == null) {
+ if (conf.getStartMessageId() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder"));
}
- return client.createReaderAsync(topicName, startMessageId, conf);
+ return client.createReaderAsync(conf);
}
@Override
public ReaderBuilder topic(String topicName) {
- this.topicName = topicName;
+ conf.setTopicName(topicName);
return this;
}
@Override
public ReaderBuilder startMessageId(MessageId startMessageId) {
- this.startMessageId = startMessageId;
+ conf.setStartMessageId(startMessageId);
return this;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index bf0dc2fdc18acb2ba0f685388d44c33fad2950c2..aaa3902b88aaea70ff5649b01af87fbb9d989138 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -23,30 +23,31 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
public class ReaderImpl implements Reader {
private final ConsumerImpl consumer;
- public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageId,
- ReaderConfiguration readerConfiguration, ExecutorService listenerExecutor,
- CompletableFuture consumerFuture) {
+ public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData readerConfiguration,
+ ExecutorService listenerExecutor, CompletableFuture consumerFuture) {
String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
- ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
+ ConsumerConfigurationData consumerConfiguration = new ConsumerConfigurationData();
+ consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
+ consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
if (readerConfiguration.getReaderName() != null) {
@@ -76,8 +77,8 @@ public class ReaderImpl implements Reader {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
- consumer = new ConsumerImpl(client, topic, subscription, consumerConfiguration, listenerExecutor, -1,
- consumerFuture, SubscriptionMode.NonDurable, startMessageId);
+ consumer = new ConsumerImpl(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
+ -1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId());
}
@Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
index a7c25c163f8a1b6a2d96ec3145d8779436196167..6b3f937942b505f91097bb621ea197ffc0e2e06e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImpl.java
@@ -20,17 +20,20 @@ package org.apache.pulsar.client.impl;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.TopicMetadata;
public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
+ private static final long serialVersionUID = 1L;
+
private static final AtomicIntegerFieldUpdater PARTITION_INDEX_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex");
+ @SuppressWarnings("unused")
private volatile int partitionIndex = 0;
- public RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme hashingScheme) {
+ public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme) {
super(hashingScheme);
PARTITION_INDEX_UPDATER.set(this, 0);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
index 3a0cd544ba0ce0cf2b517056788b3a6eb94b5dc0..f9b94bc0e37bc36828f29a993f60a5be329c8fab 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImpl.java
@@ -18,15 +18,17 @@
*/
package org.apache.pulsar.client.impl;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.TopicMetadata;
public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
+ private static final long serialVersionUID = 1L;
+
private final int partitionIndex;
- public SinglePartitionMessageRouterImpl(int partitionIndex, ProducerConfiguration.HashingScheme hashingScheme) {
+ public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {
super(hashingScheme);
this.partitionIndex = partitionIndex;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 852c5d25ead5049d814c5cff4a781811f2e63c77..3abc9a10d9081b679a5c74220b28d6358101a5c6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
-import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
@@ -37,15 +36,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.DestinationName;
@@ -54,6 +53,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Lists;
+
public class TopicsConsumerImpl extends ConsumerBase {
// All topics should be in same namespace
@@ -79,14 +80,12 @@ public class TopicsConsumerImpl extends ConsumerBase {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStats stats;
private final UnAckedMessageTracker unAckedMessageTracker;
- private final ConsumerConfiguration internalConfig;
+ private final ConsumerConfigurationData internalConfig;
- TopicsConsumerImpl(PulsarClientImpl client, Collection topics, String subscription,
- ConsumerConfiguration conf, ExecutorService listenerExecutor,
- CompletableFuture subscribeFuture) {
- super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription,
- conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
- subscribeFuture);
+ TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorService listenerExecutor,
+ CompletableFuture subscribeFuture) {
+ super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
+ Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
@@ -106,23 +105,19 @@ public class TopicsConsumerImpl extends ConsumerBase {
this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
- if (topics.isEmpty()) {
+ if (conf.getTopicNames().isEmpty()) {
this.namespaceName = null;
setState(State.Ready);
subscribeFuture().complete(TopicsConsumerImpl.this);
return;
}
- checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics should have same namespace.");
- this.namespaceName = topics.stream().findFirst().flatMap(
- new Function>() {
- @Override
- public Optional apply(String s) {
- return Optional.of(DestinationName.get(s).getNamespaceObject());
- }
- }).get();
+ checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics should have same namespace.");
+ this.namespaceName = conf.getTopicNames().stream().findFirst()
+ .flatMap(s -> Optional.of(DestinationName.get(s).getNamespaceObject())).get();
- List> futures = topics.stream().map(t -> subscribeAsync(t)).collect(Collectors.toList());
+ List> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t))
+ .collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
@@ -490,8 +485,9 @@ public class TopicsConsumerImpl extends ConsumerBase {
return subscription;
}
- private ConsumerConfiguration getInternalConsumerConfig() {
- ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+ private ConsumerConfigurationData getInternalConsumerConfig() {
+ ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
+ internalConsumerConfig.setSubscriptionName(subscription);
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
@@ -500,7 +496,7 @@ public class TopicsConsumerImpl extends ConsumerBase {
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
}
if (conf.getAckTimeoutMillis() != 0) {
- internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+ internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
}
return internalConsumerConfig;
@@ -658,9 +654,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
partitionIndex -> {
String partitionName = DestinationName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture subFuture = new CompletableFuture();
- ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
- client.externalExecutorProvider().getExecutor(), partitionIndex,
- subFuture);
+ ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, internalConfig,
+ client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
@@ -671,9 +666,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
partitionNumber.incrementAndGet();
CompletableFuture subFuture = new CompletableFuture();
- ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, subscription, internalConfig,
- client.externalExecutorProvider().getExecutor(), 0,
- subFuture);
+ ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, internalConfig,
+ client.externalExecutorProvider().getExecutor(), 0, subFuture);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Lists.newArrayList(subFuture);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
new file mode 100644
index 0000000000000000000000000000000000000000..4fe7569a3bc805de74be9a71aeedffa2a13ec8fd
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import lombok.Data;
+
+/**
+ * This is a simple holder of the client configuration values.
+ */
+@Data
+public class ClientConfigurationData implements Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
+ private String serviceUrl;
+
+ @JsonIgnore
+ private Authentication authentication = new AuthenticationDisabled();
+ private long operationTimeoutMs = 30000;
+ private long statsIntervalSeconds = 60;
+
+ private int numIoThreads = 1;
+ private int numListenerThreads = 1;
+ private int connectionsPerBroker = 1;
+
+ private boolean useTcpNoDelay = true;
+
+ private boolean useTls = false;
+ private String tlsTrustCertsFilePath = "";
+ private boolean tlsAllowInsecureConnection = false;
+ private boolean tlsHostnameVerificationEnable = false;
+ private int concurrentLookupRequest = 50000;
+ private int maxNumberOfRejectedRequestPerConnection = 50;
+
+ public ClientConfigurationData clone() {
+ try {
+ return (ClientConfigurationData) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Failed to clone ClientConfigurationData");
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
new file mode 100644
index 0000000000000000000000000000000000000000..89ab5d71a60c983cbc5d59177c385b40ff3efbda
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.ConsumerEventListener;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+
+@Data
+public class ConsumerConfigurationData implements Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
+ private Set topicNames = Sets.newTreeSet();
+
+ private String subscriptionName;
+
+ private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+
+ @JsonIgnore
+ private MessageListener messageListener;
+
+ @JsonIgnore
+ private ConsumerEventListener consumerEventListener;
+
+ private int receiverQueueSize = 1000;
+
+ private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
+
+ private String consumerName = null;
+
+ private long ackTimeoutMillis = 0;
+
+ private int priorityLevel = 0;
+
+ @JsonIgnore
+ private CryptoKeyReader cryptoKeyReader = null;
+
+ private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+
+ private SortedMap properties = new TreeMap<>();
+
+ private boolean readCompacted = false;
+
+ @JsonIgnore
+ public String getSingleTopic() {
+ checkArgument(topicNames.size() == 1);
+ return topicNames.iterator().next();
+ }
+
+ public ConsumerConfigurationData clone() {
+ try {
+ ConsumerConfigurationData c = (ConsumerConfigurationData) super.clone();
+ c.topicNames = Sets.newTreeSet(this.topicNames);
+ c.properties = Maps.newTreeMap(this.properties);
+ return c;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Failed to clone ConsumerConfigurationData");
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
new file mode 100644
index 0000000000000000000000000000000000000000..1449a454f8ff7a52f2b56f9e56c549b112e48599
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.pulsar.client.api.CompressionType;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.HashingScheme;
+import org.apache.pulsar.client.api.MessageRouter;
+import org.apache.pulsar.client.api.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import lombok.Data;
+
+@Data
+public class ProducerConfigurationData implements Serializable, Cloneable {
+
+ private static final long serialVersionUID = 1L;
+
+ private String topicName = null;
+
+ private String producerName = null;
+ private long sendTimeoutMs = 30000;
+ private boolean blockIfQueueFull = false;
+ private int maxPendingMessages = 1000;
+ private int maxPendingMessagesAcrossPartitions = 50000;
+ private MessageRoutingMode messageRoutingMode = MessageRoutingMode.SinglePartition;
+ private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
+
+ private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
+
+ @JsonIgnore
+ private MessageRouter customMessageRouter = null;
+
+ private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(10);
+ private int batchingMaxMessages = 1000;
+ private boolean batchingEnabled = false; // disabled by default
+
+ @JsonIgnore
+ private CryptoKeyReader cryptoKeyReader;
+
+ @JsonIgnore
+ private Set encryptionKeys = new TreeSet<>();
+
+ private CompressionType compressionType = CompressionType.NONE;
+
+ // Cannot use Optional since it's not serializable
+ private Long initialSequenceId = null;
+
+ private SortedMap properties = new TreeMap<>();
+
+ /**
+ *
+ * Returns true if encryption keys are added
+ *
+ */
+ public boolean isEncryptionEnabled() {
+ return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
+ }
+
+ public ProducerConfigurationData clone() {
+ try {
+ ProducerConfigurationData c = (ProducerConfigurationData) super.clone();
+ c.encryptionKeys = Sets.newTreeSet(this.encryptionKeys);
+ c.properties = Maps.newTreeMap(this.properties);
+ return c;
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
new file mode 100644
index 0000000000000000000000000000000000000000..d1323ee9e55a34de21049b330e84cd0181969023
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.conf;
+
+import java.io.Serializable;
+
+import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.CryptoKeyReader;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ReaderListener;
+
+import lombok.Data;
+
+@Data
+public class ReaderConfigurationData implements Serializable, Cloneable {
+
+ private String topicName;
+ private MessageId startMessageId;
+
+ private int receiverQueueSize = 1000;
+
+ private ReaderListener readerListener;
+
+ private String readerName = null;
+
+ private CryptoKeyReader cryptoKeyReader = null;
+ private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
+
+ public ReaderConfigurationData clone() {
+ try {
+ return (ReaderConfigurationData) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new RuntimeException("Failed to clone ReaderConfigurationData");
+ }
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
index 2e7fd8006b4ed9b011fbdc1ef58c807f0c00df82..8622542d064badb509c66df014047b8e91db4987 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/api/ConsumerConfigurationTest.java
@@ -21,13 +21,15 @@ package org.apache.pulsar.client.api;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.databind.SerializationFeature;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
/**
* Unit test of {@link ConsumerConfiguration}.
*/
@@ -38,20 +40,22 @@ public class ConsumerConfigurationTest {
@Test
public void testJsonIgnore() throws Exception {
- ConsumerConfiguration conf = new ConsumerConfiguration()
- .setConsumerEventListener(new ConsumerEventListener() {
+ ConsumerConfigurationData conf = new ConsumerConfigurationData();
+ conf.setConsumerEventListener(new ConsumerEventListener() {
+
+ @Override
+ public void becameActive(Consumer consumer, int partitionId) {
+ }
+
+ @Override
+ public void becameInactive(Consumer consumer, int partitionId) {
+ }
+ });
- @Override
- public void becameActive(Consumer consumer, int partitionId) {
- }
+ conf.setMessageListener((MessageListener) (consumer, msg) -> {
+ });
- @Override
- public void becameInactive(Consumer consumer, int partitionId) {
- }
- })
- .setMessageListener((MessageListener) (consumer, msg) -> {
- })
- .setCryptoKeyReader(mock(CryptoKeyReader.class));
+ conf.setCryptoKeyReader(mock(CryptoKeyReader.class));
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
index f4b89d5d4e70d3288cfbe4f0b1deb2a7ba4bc67e..a9ab705a58941903ad0f35f88c0ee87632cf780a 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BuildersTest.java
@@ -24,7 +24,6 @@ import static org.testng.Assert.assertTrue;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.Test;
-@SuppressWarnings("deprecation")
public class BuildersTest {
@Test
@@ -33,15 +32,15 @@ public class BuildersTest {
.maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");
assertEquals(clientBuilder.conf.isUseTls(), true);
- assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
+ assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
assertTrue(b2 != clientBuilder);
b2.serviceUrl("pulsar://other-broker:6650");
- assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
- assertEquals(b2.serviceUrl, "pulsar://other-broker:6650");
+ assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
+ assertEquals(b2.conf.getServiceUrl(), "pulsar://other-broker:6650");
}
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
index a73342b4fc22c6b3cc9c1741b50e34a3fde89622..0584b78870bd258391357b7eb7e0496ab162f306 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientCnxTest.java
@@ -24,10 +24,9 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
-import java.util.concurrent.TimeUnit;
-import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;
@@ -42,8 +41,8 @@ public class ClientCnxTest {
@Test
public void testClientCnxTimeout() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
- ClientConfiguration conf = new ClientConfiguration();
- conf.setOperationTimeout(10, TimeUnit.MILLISECONDS);
+ ClientConfigurationData conf = new ClientConfigurationData();
+ conf.setOperationTimeoutMs(10);
ClientCnx cnx = new ClientCnx(conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
index d0e4d510b25d797b6cdebd3c58cf15e12e600d93..bf41ca9f3f4255d635a41d1ddce8c051283c4e22 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/RoundRobinPartitionMessageRouterImplTest.java
@@ -22,8 +22,8 @@ import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.testng.annotations.Test;
/**
@@ -36,7 +36,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
Message msg = mock(Message.class);
when(msg.getKey()).thenReturn(null);
- RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
+ RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
for (int i = 0; i < 10; i++) {
assertEquals(i % 5, router.choosePartition(msg, new TopicMetadataImpl(5)));
}
@@ -53,7 +53,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
- RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
+ RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
TopicMetadataImpl metadata = new TopicMetadataImpl(100);
assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
index c8ea0b011056b11fb1ae1c085aae68d601c7b497..3cdf392a344108b8012a0d3735a092f0183e0c23 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/SinglePartitionMessageRouterImplTest.java
@@ -22,8 +22,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.ProducerConfiguration;
import org.testng.annotations.Test;
/**
@@ -36,7 +36,7 @@ public class SinglePartitionMessageRouterImplTest {
Message msg = mock(Message.class);
when(msg.getKey()).thenReturn(null);
- SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
+ SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
assertEquals(1234, router.choosePartition(msg, new TopicMetadataImpl(2468)));
}
@@ -51,7 +51,7 @@ public class SinglePartitionMessageRouterImplTest {
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
- SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
+ SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
TopicMetadataImpl metadata = new TopicMetadataImpl(100);
assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));