未验证 提交 1a1a6e3b 编写于 作者: M Matteo Merli 提交者: GitHub

Refactored ClientConfuguration to use ClientConfigurationData shared with ClientBuilderImpl (#1276)

* Refactored ClientConfuguration to use ClientConfigurationData shared with ClientBuilderImpl

* Fixed unit tests

* Fixed cloning issue after refactoring

* Fixed another test

* Fixed cloning issues

* Fixes for mock tests

* Fixed refactoring problem in TopicsConsumerImpl
上级 a72c9129
......@@ -150,7 +150,7 @@ flexible messaging model and an intuitive client API.</description>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
......@@ -405,7 +405,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>jersey-container-servlet</artifactId>
<version>2.23.2</version>
</dependency>
<dependency>
<groupId>javax.ws.rs</groupId>
<artifactId>javax.ws.rs-api</artifactId>
......@@ -629,6 +629,13 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>powermock-module-testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.20</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
......
......@@ -22,7 +22,6 @@ import java.io.IOException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
......
......@@ -22,49 +22,39 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
public class RawReaderImpl implements RawReader {
final static int DEFAULT_RECEIVER_QUEUE_SIZE = 1000;
private final PulsarClientImpl client;
private final String topic;
private final String subscription;
private final ConsumerConfiguration consumerConfiguration;
private final ConsumerConfigurationData consumerConfiguration;
private RawConsumerImpl consumer;
public RawReaderImpl(PulsarClientImpl client, String topic, String subscription,
CompletableFuture<Consumer> consumerFuture) {
this.client = client;
this.subscription = subscription;
this.topic = topic;
consumerConfiguration = new ConsumerConfiguration();
consumerConfiguration = new ConsumerConfigurationData();
consumerConfiguration.getTopicNames().add(topic);
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(DEFAULT_RECEIVER_QUEUE_SIZE);
consumer = new RawConsumerImpl(client, topic, subscription, consumerConfiguration,
consumer = new RawConsumerImpl(client, consumerConfiguration,
consumerFuture);
}
......@@ -92,11 +82,10 @@ public class RawReaderImpl implements RawReader {
final BlockingQueue<RawMessageAndCnx> incomingRawMessages;
final Queue<CompletableFuture<RawMessage>> pendingRawReceives;
RawConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
CompletableFuture<Consumer> consumerFuture) {
super(client, topic, subscription, conf,
client.externalExecutorProvider().getExecutor(), -1, consumerFuture,
SubscriptionMode.Durable, MessageId.earliest);
RawConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf,
CompletableFuture<Consumer> consumerFuture) {
super(client, conf.getSingleTopic(), conf, client.externalExecutorProvider().getExecutor(), -1,
consumerFuture, SubscriptionMode.Durable, MessageId.earliest);
incomingRawMessages = new GrowableArrayBlockingQueue<>();
pendingRawReceives = new ConcurrentLinkedQueue<>();
}
......
......@@ -88,6 +88,7 @@ import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
......@@ -1151,17 +1152,18 @@ public class PersistentTopicTest {
brokerService.getReplicationClients().put(remoteCluster, client);
PersistentReplicator replicator = new PersistentReplicator(topic, cursor, localCluster, remoteCluster, brokerService);
doReturn(new CompletableFuture<Producer>()).when(clientImpl).createProducerAsync(matches(globalTopicName), any());
doReturn(new CompletableFuture<Producer>()).when(clientImpl)
.createProducerAsync(any(ProducerConfigurationData.class));
replicator.startProducer();
verify(clientImpl).createProducerAsync(matches(globalTopicName), any());
verify(clientImpl).createProducerAsync(any(ProducerConfigurationData.class));
replicator.disconnect(false);
replicator.disconnect(false);
replicator.startProducer();
verify(clientImpl, Mockito.times(2)).createProducerAsync(matches(globalTopicName), any());
verify(clientImpl, Mockito.times(2)).createProducerAsync(any(ProducerConfigurationData.class));
}
@Test
......
......@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.RawMessage;
import org.apache.pulsar.client.api.RawReader;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
......@@ -242,7 +243,7 @@ public class ReplicatorTest extends ReplicatorTestBase {
}
Thread.sleep(3000);
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.anyString(), Mockito.anyObject());
Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class));
}
......
......@@ -23,7 +23,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
......@@ -53,10 +53,11 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
@Test
public void testSingleIpAddress() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
List<InetAddress> result = Lists.newArrayList();
result.add(InetAddress.getByName("127.0.0.1"));
......@@ -71,10 +72,11 @@ public class ConnectionPoolTest extends MockedPulsarServiceBaseTest {
public void testDoubleIpAddress() throws Exception {
String serviceUrl = "pulsar://non-existing-dns-name:" + BROKER_PORT;
ClientConfiguration conf = new ClientConfiguration();
ClientConfigurationData conf = new ClientConfigurationData();
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("test"));
ConnectionPool pool = Mockito.spy(new ConnectionPool(conf, eventLoop));
PulsarClientImpl client = new PulsarClientImpl(serviceUrl, conf, eventLoop, pool);
conf.setServiceUrl(serviceUrl);
PulsarClientImpl client = new PulsarClientImpl(conf, eventLoop, pool);
List<InetAddress> result = Lists.newArrayList();
......
......@@ -51,9 +51,10 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata.Builder;
import org.slf4j.Logger;
......@@ -295,7 +296,7 @@ public class MessageIdTest extends BrokerTestBase {
((PulsarClientImpl) pulsarClient).timer().stop();
ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx);
......@@ -360,7 +361,7 @@ public class MessageIdTest extends BrokerTestBase {
// set clientCnx mock to get non-checksum supported version
ClientCnx mockClientCnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
doReturn(producer.brokerChecksumSupportedVersion() - 1).when(mockClientCnx).getRemoteEndpointProtocolVersion();
prod.setClientCnx(mockClientCnx);
......@@ -489,7 +490,7 @@ public class MessageIdTest extends BrokerTestBase {
MessageImpl msg1 = (MessageImpl) MessageBuilder.create().setContent("message-1".getBytes()).build();
future = producer.sendAsync(msg1);
ClientCnx cnx = spy(
new ClientCnx(new ClientConfiguration(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
new ClientCnx(new ClientConfigurationData(), ((PulsarClientImpl) pulsarClient).eventLoopGroup()));
String exc = "broker is already stopped";
// when client-try to recover checksum by resending to broker: throw exception as broker is stopped
doThrow(new IllegalStateException(exc)).when(cnx).ctx();
......
......@@ -20,13 +20,12 @@ package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
/**
* Class used to specify client side configuration like authentication, etc..
......@@ -38,29 +37,13 @@ public class ClientConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
@JsonIgnore
private Authentication authentication = new AuthenticationDisabled();
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
private int numIoThreads = 1;
private int numListenerThreads = 1;
private int connectionsPerBroker = 1;
private boolean useTcpNoDelay = true;
private boolean useTls = false;
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
private int concurrentLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
private final ClientConfigurationData confData = new ClientConfigurationData();
/**
* @return the authentication provider to be used
*/
public Authentication getAuthentication() {
return authentication;
return confData.getAuthentication();
}
/**
......@@ -71,12 +54,12 @@ public class ClientConfiguration implements Serializable {
*
* <pre>
* <code>
* ClientConfiguration conf = new ClientConfiguration();
* ClientConfiguration confData = new ClientConfiguration();
* String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
* String authParamsString = "key1:val1,key2:val2";
* Authentication auth = AuthenticationFactory.create(authPluginClassName, authParamsString);
* conf.setAuthentication(auth);
* PulsarClient client = PulsarClient.create(serviceUrl, conf);
* confData.setAuthentication(auth);
* PulsarClient client = PulsarClient.create(serviceUrl, confData);
* ....
* </code>
* </pre>
......@@ -84,7 +67,7 @@ public class ClientConfiguration implements Serializable {
* @param authentication
*/
public void setAuthentication(Authentication authentication) {
this.authentication = authentication;
confData.setAuthentication(authentication);
}
/**
......@@ -95,11 +78,11 @@ public class ClientConfiguration implements Serializable {
*
* <pre>
* <code>
* ClientConfiguration conf = new ClientConfiguration();
* ClientConfiguration confData = new ClientConfiguration();
* String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
* String authParamsString = "key1:val1,key2:val2";
* conf.setAuthentication(authPluginClassName, authParamsString);
* PulsarClient client = PulsarClient.create(serviceUrl, conf);
* confData.setAuthentication(authPluginClassName, authParamsString);
* PulsarClient client = PulsarClient.create(serviceUrl, confData);
* ....
* </code>
* </pre>
......@@ -113,7 +96,7 @@ public class ClientConfiguration implements Serializable {
*/
public void setAuthentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
this.authentication = AuthenticationFactory.create(authPluginClassName, authParamsString);
confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
}
/**
......@@ -124,12 +107,12 @@ public class ClientConfiguration implements Serializable {
*
* <pre>
* <code>
* ClientConfiguration conf = new ClientConfiguration();
* ClientConfiguration confData = new ClientConfiguration();
* String authPluginClassName = "org.apache.pulsar.client.impl.auth.MyAuthentication";
* Map<String, String> authParams = new HashMap<String, String>();
* authParams.put("key1", "val1");
* conf.setAuthentication(authPluginClassName, authParams);
* PulsarClient client = PulsarClient.create(serviceUrl, conf);
* confData.setAuthentication(authPluginClassName, authParams);
* PulsarClient client = PulsarClient.create(serviceUrl, confData);
* ....
* </code>
* </pre>
......@@ -143,15 +126,14 @@ public class ClientConfiguration implements Serializable {
*/
public void setAuthentication(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException {
this.authentication = AuthenticationFactory.create(authPluginClassName, authParams);
confData.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
}
/**
* @return the operation timeout in ms
*/
public long getOperationTimeoutMs() {
return operationTimeoutMs;
return confData.getOperationTimeoutMs();
}
/**
......@@ -167,14 +149,14 @@ public class ClientConfiguration implements Serializable {
*/
public void setOperationTimeout(int operationTimeout, TimeUnit unit) {
checkArgument(operationTimeout >= 0);
this.operationTimeoutMs = unit.toMillis(operationTimeout);
confData.setOperationTimeoutMs(unit.toMillis(operationTimeout));
}
/**
* @return the number of threads to use for handling connections
*/
public int getIoThreads() {
return numIoThreads;
return confData.getNumIoThreads();
}
/**
......@@ -184,14 +166,14 @@ public class ClientConfiguration implements Serializable {
*/
public void setIoThreads(int numIoThreads) {
checkArgument(numIoThreads > 0);
this.numIoThreads = numIoThreads;
confData.setNumIoThreads(numIoThreads);
}
/**
* @return the number of threads to use for message listeners
*/
public int getListenerThreads() {
return numListenerThreads;
return confData.getNumListenerThreads();
}
/**
......@@ -201,14 +183,14 @@ public class ClientConfiguration implements Serializable {
*/
public void setListenerThreads(int numListenerThreads) {
checkArgument(numListenerThreads > 0);
this.numListenerThreads = numListenerThreads;
confData.setNumListenerThreads(numListenerThreads);
}
/**
* @return the max number of connections per single broker
*/
public int getConnectionsPerBroker() {
return connectionsPerBroker;
return confData.getConnectionsPerBroker();
}
/**
......@@ -223,14 +205,14 @@ public class ClientConfiguration implements Serializable {
*/
public void setConnectionsPerBroker(int connectionsPerBroker) {
checkArgument(connectionsPerBroker > 0, "Connections per broker need to be greater than 0");
this.connectionsPerBroker = connectionsPerBroker;
confData.setConnectionsPerBroker(connectionsPerBroker);
}
/**
* @return whether TCP no-delay should be set on the connections
*/
public boolean isUseTcpNoDelay() {
return useTcpNoDelay;
return confData.isUseTcpNoDelay();
}
/**
......@@ -245,14 +227,14 @@ public class ClientConfiguration implements Serializable {
* @param useTcpNoDelay
*/
public void setUseTcpNoDelay(boolean useTcpNoDelay) {
this.useTcpNoDelay = useTcpNoDelay;
confData.setUseTcpNoDelay(useTcpNoDelay);
}
/**
* @return whether TLS encryption is used on the connection
*/
public boolean isUseTls() {
return useTls;
return confData.isUseTls();
}
/**
......@@ -261,14 +243,14 @@ public class ClientConfiguration implements Serializable {
* @param useTls
*/
public void setUseTls(boolean useTls) {
this.useTls = useTls;
confData.setUseTls(useTls);
}
/**
* @return path to the trusted TLS certificate file
*/
public String getTlsTrustCertsFilePath() {
return tlsTrustCertsFilePath;
return confData.getTlsTrustCertsFilePath();
}
/**
......@@ -277,14 +259,14 @@ public class ClientConfiguration implements Serializable {
* @param tlsTrustCertsFilePath
*/
public void setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
confData.setTlsTrustCertsFilePath(tlsTrustCertsFilePath);
}
/**
* @return whether the Pulsar client accept untrusted TLS certificate from broker
*/
public boolean isTlsAllowInsecureConnection() {
return tlsAllowInsecureConnection;
return confData.isTlsAllowInsecureConnection();
}
/**
......@@ -293,7 +275,7 @@ public class ClientConfiguration implements Serializable {
* @param tlsAllowInsecureConnection
*/
public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
confData.setTlsAllowInsecureConnection(tlsAllowInsecureConnection);
}
/**
......@@ -302,7 +284,7 @@ public class ClientConfiguration implements Serializable {
* @return the interval between each stat info <i>(default: 60 seconds)</i>
*/
public long getStatsIntervalSeconds() {
return statsIntervalSeconds;
return confData.getStatsIntervalSeconds();
}
/**
......@@ -315,7 +297,7 @@ public class ClientConfiguration implements Serializable {
* time unit for {@code statsInterval}
*/
public void setStatsInterval(long statsInterval, TimeUnit unit) {
this.statsIntervalSeconds = unit.toSeconds(statsInterval);
confData.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
}
/**
......@@ -324,18 +306,18 @@ public class ClientConfiguration implements Serializable {
* @return
*/
public int getConcurrentLookupRequest() {
return concurrentLookupRequest;
return confData.getConcurrentLookupRequest();
}
/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
* <i>(default: 50000)</i> It should be configured with higher value only in case of it requires to
* produce/subscribe on thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
public void setConcurrentLookupRequest(int concurrentLookupRequest) {
this.concurrentLookupRequest = concurrentLookupRequest;
confData.setConcurrentLookupRequest(concurrentLookupRequest);
}
/**
......@@ -344,7 +326,7 @@ public class ClientConfiguration implements Serializable {
* @return
*/
public int getMaxNumberOfRejectedRequestPerConnection() {
return maxNumberOfRejectedRequestPerConnection;
return confData.getMaxNumberOfRejectedRequestPerConnection();
}
/**
......@@ -355,24 +337,33 @@ public class ClientConfiguration implements Serializable {
* @param maxNumberOfRejectedRequestPerConnection
*/
public void setMaxNumberOfRejectedRequestPerConnection(int maxNumberOfRejectedRequestPerConnection) {
this.maxNumberOfRejectedRequestPerConnection = maxNumberOfRejectedRequestPerConnection;
confData.setMaxNumberOfRejectedRequestPerConnection(maxNumberOfRejectedRequestPerConnection);
}
public boolean isTlsHostnameVerificationEnable() {
return tlsHostnameVerificationEnable;
return confData.isTlsHostnameVerificationEnable();
}
/**
* It allows to validate hostname verification when client connects to broker over tls. It validates incoming x509
* certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1. Server
* Identity hostname verification.
*
* certificate and matches provided hostname(CN/SAN) with expected broker's host name. It follows RFC 2818, 3.1.
* Server Identity hostname verification.
*
* @see <a href="https://tools.ietf.org/html/rfc2818">rfc2818</a>
*
*
* @param tlsHostnameVerificationEnable
*/
public void setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
confData.setTlsHostnameVerificationEnable(tlsHostnameVerificationEnable);
}
public ClientConfiguration setServiceUrl(String serviceUrl) {
confData.setServiceUrl(serviceUrl);
return this;
}
public ClientConfigurationData getConfigurationData() {
return confData;
}
}
......@@ -21,12 +21,12 @@ package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
/**
* Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
* attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
......@@ -44,37 +44,13 @@ public class ConsumerConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
@JsonIgnore
private MessageListener messageListener;
@JsonIgnore
private ConsumerEventListener consumerEventListener;
private int receiverQueueSize = 1000;
private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
private String consumerName = null;
private long ackTimeoutMillis = 0;
private int priorityLevel = 0;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
private final Map<String, String> properties = new HashMap<>();
private boolean readCompacted = false;
private final ConsumerConfigurationData conf = new ConsumerConfigurationData();
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
public long getAckTimeoutMillis() {
return ackTimeoutMillis;
return conf.getAckTimeoutMillis();
}
/**
......@@ -91,7 +67,7 @@ public class ConsumerConfiguration implements Serializable {
long ackTimeoutMillis = timeUnit.toMillis(ackTimeout);
checkArgument(ackTimeoutMillis >= minAckTimeoutMillis,
"Ack timeout should be should be greater than " + minAckTimeoutMillis + " ms");
this.ackTimeoutMillis = ackTimeoutMillis;
conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
......@@ -99,7 +75,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the configured subscription type
*/
public SubscriptionType getSubscriptionType() {
return this.subscriptionType;
return conf.getSubscriptionType();
}
/**
......@@ -112,7 +88,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setSubscriptionType(SubscriptionType subscriptionType) {
checkNotNull(subscriptionType);
this.subscriptionType = subscriptionType;
conf.setSubscriptionType(subscriptionType);
return this;
}
......@@ -120,7 +96,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the configured {@link MessageListener} for the consumer
*/
public MessageListener getMessageListener() {
return this.messageListener;
return conf.getMessageListener();
}
/**
......@@ -134,7 +110,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setMessageListener(MessageListener messageListener) {
checkNotNull(messageListener);
this.messageListener = messageListener;
conf.setMessageListener(messageListener);
return this;
}
......@@ -144,24 +120,27 @@ public class ConsumerConfiguration implements Serializable {
* @since 2.0
*/
public ConsumerEventListener getConsumerEventListener() {
return this.consumerEventListener;
return conf.getConsumerEventListener();
}
/**
* Sets a {@link ConsumerEventListener} for the consumer.
*
* <p>The consumer group listener is used for receiving consumer state change in a consumer group for failover
* <p>
* The consumer group listener is used for receiving consumer state change in a consumer group for failover
* subscription. Application can then react to the consumer state changes.
*
* <p>This change is experimental. It is subject to changes coming in release 2.0.
* <p>
* This change is experimental. It is subject to changes coming in release 2.0.
*
* @param listener the consumer group listener object
* @param listener
* the consumer group listener object
* @return consumer configuration
* @since 2.0
*/
public ConsumerConfiguration setConsumerEventListener(ConsumerEventListener listener) {
checkNotNull(listener);
this.consumerEventListener = listener;
conf.setConsumerEventListener(listener);
return this;
}
......@@ -169,15 +148,14 @@ public class ConsumerConfiguration implements Serializable {
* @return the configure receiver queue size value
*/
public int getReceiverQueueSize() {
return this.receiverQueueSize;
return conf.getReceiverQueueSize();
}
/**
* @return the configured max total receiver queue size across partitions
*/
public int getMaxTotalReceiverQueueSizeAcrossPartitions() {
return maxTotalReceiverQueueSizeAcrossPartitions;
return conf.getMaxTotalReceiverQueueSizeAcrossPartitions();
}
/**
......@@ -189,15 +167,15 @@ public class ConsumerConfiguration implements Serializable {
* @param maxTotalReceiverQueueSizeAcrossPartitions
*/
public void setMaxTotalReceiverQueueSizeAcrossPartitions(int maxTotalReceiverQueueSizeAcrossPartitions) {
checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= receiverQueueSize);
this.maxTotalReceiverQueueSizeAcrossPartitions = maxTotalReceiverQueueSizeAcrossPartitions;
checkArgument(maxTotalReceiverQueueSizeAcrossPartitions >= conf.getReceiverQueueSize());
conf.setMaxTotalReceiverQueueSizeAcrossPartitions(maxTotalReceiverQueueSizeAcrossPartitions);
}
/**
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
return this.cryptoKeyReader;
return conf.getCryptoKeyReader();
}
/**
......@@ -208,24 +186,25 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
this.cryptoKeyReader = cryptoKeyReader;
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
/**
* Sets the ConsumerCryptoFailureAction to the value specified
*
* @param The consumer action
* @param The
* consumer action
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
cryptoFailureAction = action;
conf.setCryptoFailureAction(action);
}
/**
* @return The ConsumerCryptoFailureAction
*/
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
return conf.getCryptoFailureAction();
}
/**
......@@ -256,7 +235,7 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setReceiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
this.receiverQueueSize = receiverQueueSize;
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
......@@ -264,7 +243,7 @@ public class ConsumerConfiguration implements Serializable {
* @return the consumer name
*/
public String getConsumerName() {
return consumerName;
return conf.getConsumerName();
}
/**
......@@ -274,12 +253,12 @@ public class ConsumerConfiguration implements Serializable {
*/
public ConsumerConfiguration setConsumerName(String consumerName) {
checkArgument(consumerName != null && !consumerName.equals(""));
this.consumerName = consumerName;
conf.setConsumerName(consumerName);
return this;
}
public int getPriorityLevel() {
return priorityLevel;
return conf.getPriorityLevel();
}
/**
......@@ -303,32 +282,34 @@ public class ConsumerConfiguration implements Serializable {
* @param priorityLevel
*/
public void setPriorityLevel(int priorityLevel) {
this.priorityLevel = priorityLevel;
conf.setPriorityLevel(priorityLevel);
}
public boolean getReadCompacted() {
return readCompacted;
return conf.isReadCompacted();
}
/**
* If enabled, the consumer will read messages from the compacted topic rather than reading the full message
* backlog of the topic. This means that, if the topic has been compacted, the consumer will only see the latest
* value for each key in the topic, up until the point in the topic message backlog that has been compacted.
* Beyond that point, the messages will be sent as normal.
* If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
* of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
* each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
* point, the messages will be sent as normal.
*
* readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer
* (i.e. failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent
* topics or on a shared subscription, will lead to the subscription call throwing a PulsarClientException.
* readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
* failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
* shared subscription, will lead to the subscription call throwing a PulsarClientException.
*
* @param readCompacted whether to read from the compacted topic
* @param readCompacted
* whether to read from the compacted topic
*/
public ConsumerConfiguration setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
conf.setReadCompacted(readCompacted);
return this;
}
/**
* Set a name/value property with this consumer.
*
* @param key
* @param value
* @return
......@@ -336,23 +317,26 @@ public class ConsumerConfiguration implements Serializable {
public ConsumerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
properties.put(key, value);
conf.getProperties().put(key, value);
return this;
}
/**
* Add all the properties in the provided map
*
* @param properties
* @return
*/
public ConsumerConfiguration setProperties(Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
conf.getProperties().putAll(properties);
return this;
}
public Map<String, String> getProperties() {
return properties;
return conf.getProperties();
}
public ConsumerConfigurationData getConfigurationData() {
return conf;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
/**
* Standard hashing functions available when choosing the partition to use for a particular message.
*/
public enum HashingScheme {
/**
* Use regural <code>String.hashCode()</code>
*/
JavaStringHash,
/**
* Use Murmur3 hashing function.
* <a href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
*/
Murmur3_32Hash
}
......@@ -18,6 +18,25 @@
*/
package org.apache.pulsar.client.api;
/**
* Default routing mode for messages to partition.
*
* This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a particular
* message.
*/
public enum MessageRoutingMode {
SinglePartition, RoundRobinPartition, CustomPartition
/**
* The producer will chose one single partition and publish all the messages into that partition.
*/
SinglePartition,
/**
* Publish messages across all partitions in round-robin.
*/
RoundRobinPartition,
/**
* Use custom message router implemenation that will be called to determine the partition for a particular message.
*/
CustomPartition
}
......@@ -144,12 +144,34 @@ public interface ProducerBuilder extends Serializable, Cloneable {
ProducerBuilder blockIfQueueFull(boolean blockIfQueueFull);
/**
* Set the message routing mode for the partitioned producer
* Set the message routing mode for the partitioned producer.
*
* @param mode
* @return
* Default routing mode for messages to partition.
*
* This logic is applied when the application is not setting a key {@link MessageBuilder#setKey(String)} on a
* particular message.
*
* @param messageRoutingMode
* the message routing mode
*/
ProducerBuilder messageRoutingMode(MessageRoutingMode messageRoutingMode);
/**
* Change the {@link HashingScheme} used to chose the partition on where to publish a particular message.
*
* Standard hashing functions available are:
* <ul>
* <li><code>JavaStringHash</code>: Java <code>String.hashCode()</code>
* <li><code>Murmur3_32Hash</code>: Use Murmur3 hashing function.
* <a href="https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash</a>
* </ul>
*
* Default is <code>JavaStringHash</code>.
*
* @param hashingScheme
* the chosen {@link HashingScheme}
*/
ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode);
ProducerBuilder hashingScheme(HashingScheme hashingScheme);
/**
* Set the compression type for the producer.
......
......@@ -21,17 +21,16 @@ package org.apache.pulsar.client.api;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import com.google.common.base.Objects;
import lombok.EqualsAndHashCode;
/**
* Producer's configuration
......@@ -39,51 +38,27 @@ import com.google.common.base.Objects;
* @deprecated use {@link PulsarClient#newProducer()} to construct and configure a {@link Producer} instance
*/
@Deprecated
@EqualsAndHashCode
public class ProducerConfiguration implements Serializable {
private static final long serialVersionUID = 1L;
private String producerName = null;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = 1000;
private int maxPendingMessagesAcrossPartitions = 50000;
private MessageRoutingMode messageRouteMode = MessageRoutingMode.SinglePartition;
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
@JsonIgnore
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMs = 10;
private int batchingMaxMessages = 1000;
private boolean batchingEnabled = false; // disabled by default
@JsonIgnore
private CryptoKeyReader cryptoKeyReader;
@JsonIgnore
private ConcurrentOpenHashSet<String> encryptionKeys;
private CompressionType compressionType = CompressionType.NONE;
// Cannot use Optional<Long> since it's not serializable
private Long initialSequenceId = null;
private final Map<String, String> properties = new HashMap<>();
private final ProducerConfigurationData conf = new ProducerConfigurationData();
public enum MessageRoutingMode {
SinglePartition, RoundRobinPartition, CustomPartition
}
public enum HashingScheme {
JavaStringHash,
Murmur3_32Hash
JavaStringHash, Murmur3_32Hash
}
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
/**
* @return the configured custom producer name or null if no custom name was specified
* @since 1.20.0
*/
public String getProducerName() {
return producerName;
return conf.getProducerName();
}
/**
......@@ -103,14 +78,14 @@ public class ProducerConfiguration implements Serializable {
* @since 1.20.0
*/
public void setProducerName(String producerName) {
this.producerName = producerName;
conf.setProducerName(producerName);
}
/**
* @return the message send timeout in ms
*/
public long getSendTimeoutMs() {
return sendTimeoutMs;
return conf.getSendTimeoutMs();
}
/**
......@@ -125,7 +100,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setSendTimeout(int sendTimeout, TimeUnit unit) {
checkArgument(sendTimeout >= 0);
this.sendTimeoutMs = unit.toMillis(sendTimeout);
conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
......@@ -133,7 +108,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of messages allowed in the outstanding messages queue for the producer
*/
public int getMaxPendingMessages() {
return maxPendingMessages;
return conf.getMaxPendingMessages();
}
/**
......@@ -147,16 +122,16 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setMaxPendingMessages(int maxPendingMessages) {
checkArgument(maxPendingMessages > 0);
this.maxPendingMessages = maxPendingMessages;
conf.setMaxPendingMessages(maxPendingMessages);
return this;
}
public HashingScheme getHashingScheme() {
return hashingScheme;
return HashingScheme.valueOf(conf.getHashingScheme().toString());
}
public ProducerConfiguration setHashingScheme(HashingScheme hashingScheme) {
this.hashingScheme = hashingScheme;
conf.setHashingScheme(org.apache.pulsar.client.api.HashingScheme.valueOf(hashingScheme.toString()));
return this;
}
......@@ -165,7 +140,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of pending messages allowed across all the partitions
*/
public int getMaxPendingMessagesAcrossPartitions() {
return maxPendingMessagesAcrossPartitions;
return conf.getMaxPendingMessagesAcrossPartitions();
}
/**
......@@ -177,8 +152,8 @@ public class ProducerConfiguration implements Serializable {
* @param maxPendingMessagesAcrossPartitions
*/
public void setMaxPendingMessagesAcrossPartitions(int maxPendingMessagesAcrossPartitions) {
checkArgument(maxPendingMessagesAcrossPartitions >= maxPendingMessages);
this.maxPendingMessagesAcrossPartitions = maxPendingMessagesAcrossPartitions;
checkArgument(maxPendingMessagesAcrossPartitions >= conf.getMaxPendingMessages());
conf.setMaxPendingMessagesAcrossPartitions(maxPendingMessagesAcrossPartitions);
}
/**
......@@ -187,7 +162,7 @@ public class ProducerConfiguration implements Serializable {
* pending queue is full
*/
public boolean getBlockIfQueueFull() {
return blockIfQueueFull;
return conf.isBlockIfQueueFull();
}
/**
......@@ -202,7 +177,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setBlockIfQueueFull(boolean blockIfQueueFull) {
this.blockIfQueueFull = blockIfQueueFull;
conf.setBlockIfQueueFull(blockIfQueueFull);
return this;
}
......@@ -214,7 +189,8 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setMessageRoutingMode(MessageRoutingMode messageRouteMode) {
checkNotNull(messageRouteMode);
this.messageRouteMode = messageRouteMode;
conf.setMessageRoutingMode(
org.apache.pulsar.client.api.MessageRoutingMode.valueOf(messageRouteMode.toString()));
return this;
}
......@@ -224,7 +200,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public MessageRoutingMode getMessageRoutingMode() {
return messageRouteMode;
return MessageRoutingMode.valueOf(conf.getMessageRoutingMode().toString());
}
/**
......@@ -244,7 +220,7 @@ public class ProducerConfiguration implements Serializable {
* compress messages.
*/
public ProducerConfiguration setCompressionType(CompressionType compressionType) {
this.compressionType = compressionType;
conf.setCompressionType(compressionType);
return this;
}
......@@ -252,7 +228,7 @@ public class ProducerConfiguration implements Serializable {
* @return the configured compression type for this producer
*/
public CompressionType getCompressionType() {
return compressionType;
return conf.getCompressionType();
}
/**
......@@ -264,7 +240,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setMessageRouter(MessageRouter messageRouter) {
checkNotNull(messageRouter);
setMessageRoutingMode(MessageRoutingMode.CustomPartition);
customMessageRouter = messageRouter;
conf.setCustomMessageRouter(messageRouter);
return this;
}
......@@ -278,7 +254,7 @@ public class ProducerConfiguration implements Serializable {
*/
@Deprecated
public MessageRouter getMessageRouter(int numPartitions) {
return customMessageRouter;
return conf.getCustomMessageRouter();
}
/**
......@@ -287,7 +263,7 @@ public class ProducerConfiguration implements Serializable {
* @return message router set by {@link #setMessageRouter(MessageRouter)}.
*/
public MessageRouter getMessageRouter() {
return customMessageRouter;
return conf.getCustomMessageRouter();
}
/**
......@@ -295,7 +271,7 @@ public class ProducerConfiguration implements Serializable {
*/
public boolean getBatchingEnabled() {
return batchingEnabled;
return conf.isBatchingEnabled();
}
/**
......@@ -315,7 +291,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setBatchingEnabled(boolean batchMessagesEnabled) {
this.batchingEnabled = batchMessagesEnabled;
conf.setBatchingEnabled(batchMessagesEnabled);
return this;
}
......@@ -323,7 +299,7 @@ public class ProducerConfiguration implements Serializable {
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
return this.cryptoKeyReader;
return conf.getCryptoKeyReader();
}
/**
......@@ -334,7 +310,7 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
this.cryptoKeyReader = cryptoKeyReader;
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
......@@ -343,8 +319,8 @@ public class ProducerConfiguration implements Serializable {
* @return encryptionKeys
*
*/
public ConcurrentOpenHashSet<String> getEncryptionKeys() {
return this.encryptionKeys;
public Set<String> getEncryptionKeys() {
return conf.getEncryptionKeys();
}
/**
......@@ -353,7 +329,7 @@ public class ProducerConfiguration implements Serializable {
*
*/
public boolean isEncryptionEnabled() {
return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
return conf.isEncryptionEnabled();
}
/**
......@@ -366,16 +342,11 @@ public class ProducerConfiguration implements Serializable {
*
*/
public void addEncryptionKey(String key) {
if (this.encryptionKeys == null) {
this.encryptionKeys = new ConcurrentOpenHashSet<String>(16, 1);
}
this.encryptionKeys.add(key);
conf.getEncryptionKeys().add(key);
}
public void removeEncryptionKey(String key) {
if (this.encryptionKeys != null) {
this.encryptionKeys.remove(key);
}
conf.getEncryptionKeys().remove(key);
}
/**
......@@ -385,14 +356,14 @@ public class ProducerConfiguration implements Serializable {
* The producer action
*/
public void setCryptoFailureAction(ProducerCryptoFailureAction action) {
cryptoFailureAction = action;
conf.setCryptoFailureAction(action);
}
/**
* @return The ProducerCryptoFailureAction
*/
public ProducerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
return conf.getCryptoFailureAction();
}
/**
......@@ -401,7 +372,7 @@ public class ProducerConfiguration implements Serializable {
* @see ProducerConfiguration#setBatchingMaxPublishDelay(long, TimeUnit)
*/
public long getBatchingMaxPublishDelayMs() {
return batchingMaxPublishDelayMs;
return TimeUnit.MICROSECONDS.toMillis(conf.getBatchingMaxPublishDelayMicros());
}
/**
......@@ -423,7 +394,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setBatchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
long delayInMs = timeUnit.toMillis(batchDelay);
checkArgument(delayInMs >= 1, "configured value for batch delay must be at least 1ms");
this.batchingMaxPublishDelayMs = delayInMs;
conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
return this;
}
......@@ -432,7 +403,7 @@ public class ProducerConfiguration implements Serializable {
* @return the maximum number of messages permitted in a batch.
*/
public int getBatchingMaxMessages() {
return batchingMaxMessages;
return conf.getBatchingMaxMessages();
}
/**
......@@ -448,12 +419,12 @@ public class ProducerConfiguration implements Serializable {
*/
public ProducerConfiguration setBatchingMaxMessages(int batchMessagesMaxMessagesPerBatch) {
checkArgument(batchMessagesMaxMessagesPerBatch > 0);
this.batchingMaxMessages = batchMessagesMaxMessagesPerBatch;
conf.setBatchingMaxMessages(batchMessagesMaxMessagesPerBatch);
return this;
}
public Optional<Long> getInitialSequenceId() {
return initialSequenceId != null ? Optional.of(initialSequenceId) : Optional.empty();
return Optional.ofNullable(conf.getInitialSequenceId());
}
/**
......@@ -466,7 +437,7 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setInitialSequenceId(long initialSequenceId) {
this.initialSequenceId = initialSequenceId;
conf.setInitialSequenceId(initialSequenceId);
return this;
}
......@@ -480,7 +451,7 @@ public class ProducerConfiguration implements Serializable {
public ProducerConfiguration setProperty(String key, String value) {
checkArgument(key != null);
checkArgument(value != null);
properties.put(key, value);
conf.getProperties().put(key, value);
return this;
}
......@@ -491,26 +462,15 @@ public class ProducerConfiguration implements Serializable {
* @return
*/
public ProducerConfiguration setProperties(Map<String, String> properties) {
if (properties != null) {
this.properties.putAll(properties);
}
conf.getProperties().putAll(properties);
return this;
}
public Map<String, String> getProperties() {
return properties;
return conf.getProperties();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof ProducerConfiguration) {
ProducerConfiguration other = (ProducerConfiguration) obj;
return Objects.equal(this.sendTimeoutMs, other.sendTimeoutMs)
&& Objects.equal(maxPendingMessages, other.maxPendingMessages)
&& Objects.equal(this.messageRouteMode, other.messageRouteMode)
&& Objects.equal(this.hashingScheme, other.hashingScheme);
}
return false;
public ProducerConfigurationData getProducerConfigurationData() {
return conf;
}
}
......@@ -23,6 +23,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Serializable;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
/**
*
* @deprecated Use {@link PulsarClient#newReader()} to construct and configure a {@link Reader} instance
......@@ -30,20 +32,13 @@ import java.io.Serializable;
@Deprecated
public class ReaderConfiguration implements Serializable {
private int receiverQueueSize = 1000;
private ReaderListener readerListener;
private String readerName = null;
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
private final ReaderConfigurationData conf = new ReaderConfigurationData();
/**
* @return the configured {@link ReaderListener} for the reader
*/
public ReaderListener getReaderListener() {
return this.readerListener;
return conf.getReaderListener();
}
/**
......@@ -57,7 +52,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReaderListener(ReaderListener readerListener) {
checkNotNull(readerListener);
this.readerListener = readerListener;
conf.setReaderListener(readerListener);
return this;
}
......@@ -65,14 +60,14 @@ public class ReaderConfiguration implements Serializable {
* @return the configure receiver queue size value
*/
public int getReceiverQueueSize() {
return this.receiverQueueSize;
return conf.getReceiverQueueSize();
}
/**
* @return the CryptoKeyReader
*/
public CryptoKeyReader getCryptoKeyReader() {
return this.cryptoKeyReader;
return conf.getCryptoKeyReader();
}
/**
......@@ -83,7 +78,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setCryptoKeyReader(CryptoKeyReader cryptoKeyReader) {
checkNotNull(cryptoKeyReader);
this.cryptoKeyReader = cryptoKeyReader;
conf.setCryptoKeyReader(cryptoKeyReader);
return this;
}
......@@ -94,14 +89,14 @@ public class ReaderConfiguration implements Serializable {
* The action to take when the decoding fails
*/
public void setCryptoFailureAction(ConsumerCryptoFailureAction action) {
cryptoFailureAction = action;
conf.setCryptoFailureAction(action);
}
/**
* @return The ConsumerCryptoFailureAction
*/
public ConsumerCryptoFailureAction getCryptoFailureAction() {
return this.cryptoFailureAction;
return conf.getCryptoFailureAction();
}
/**
......@@ -118,7 +113,7 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReceiverQueueSize(int receiverQueueSize) {
checkArgument(receiverQueueSize >= 0, "Receiver queue size cannot be negative");
this.receiverQueueSize = receiverQueueSize;
conf.setReceiverQueueSize(receiverQueueSize);
return this;
}
......@@ -126,7 +121,7 @@ public class ReaderConfiguration implements Serializable {
* @return the consumer name
*/
public String getReaderName() {
return readerName;
return conf.getReaderName();
}
/**
......@@ -136,9 +131,13 @@ public class ReaderConfiguration implements Serializable {
*/
public ReaderConfiguration setReaderName(String readerName) {
checkArgument(readerName != null && !readerName.equals(""));
this.readerName = readerName;
conf.setReaderName(readerName);
return this;
}
public ReaderConfigurationData getReaderConfigurationData() {
return conf;
}
private static final long serialVersionUID = 1L;
}
......@@ -36,7 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.resolver.InetSocketAddressResolver;
public class BinaryProtoLookupService implements LookupService {
......
......@@ -22,41 +22,44 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
@SuppressWarnings("deprecation")
public class ClientBuilderImpl implements ClientBuilder {
private static final long serialVersionUID = 1L;
String serviceUrl;
final ClientConfiguration conf = new ClientConfiguration();
final ClientConfigurationData conf;
public ClientBuilderImpl() {
this(new ClientConfigurationData());
}
private ClientBuilderImpl(ClientConfigurationData conf) {
this.conf = conf;
}
@Override
public PulsarClient build() throws PulsarClientException {
if (serviceUrl == null) {
if (conf.getServiceUrl() == null) {
throw new IllegalArgumentException("service URL needs to be specified on the ClientBuilder object");
}
return new PulsarClientImpl(serviceUrl, conf);
return new PulsarClientImpl(conf);
}
@Override
public ClientBuilder clone() {
try {
return (ClientBuilder) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ClientBuilderImpl");
}
return new ClientBuilderImpl(conf.clone());
}
@Override
public ClientBuilder serviceUrl(String serviceUrl) {
this.serviceUrl = serviceUrl;
conf.setServiceUrl(serviceUrl);
return this;
}
......@@ -69,32 +72,32 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
conf.setAuthentication(authPluginClassName, authParamsString);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParamsString));
return this;
}
@Override
public ClientBuilder authentication(String authPluginClassName, Map<String, String> authParams)
throws UnsupportedAuthenticationException {
conf.setAuthentication(authPluginClassName, authParams);
conf.setAuthentication(AuthenticationFactory.create(authPluginClassName, authParams));
return this;
}
@Override
public ClientBuilder operationTimeout(int operationTimeout, TimeUnit unit) {
conf.setOperationTimeout(operationTimeout, unit);
conf.setOperationTimeoutMs(unit.toMillis(operationTimeout));
return this;
}
@Override
public ClientBuilder ioThreads(int numIoThreads) {
conf.setIoThreads(numIoThreads);
conf.setNumIoThreads(numIoThreads);
return this;
}
@Override
public ClientBuilder listenerThreads(int numListenerThreads) {
conf.setListenerThreads(numListenerThreads);
conf.setNumListenerThreads(numListenerThreads);
return this;
}
......@@ -136,7 +139,7 @@ public class ClientBuilderImpl implements ClientBuilder {
@Override
public ClientBuilder statsInterval(long statsInterval, TimeUnit unit) {
conf.setStatsInterval(statsInterval, unit);
conf.setStatsIntervalSeconds(unit.toSeconds(statsInterval));
return this;
}
......
......@@ -33,11 +33,12 @@ import javax.net.ssl.SSLSession;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.TimeoutException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange;
......@@ -57,7 +58,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -106,7 +106,7 @@ public class ClientCnx extends PulsarHandler {
None, SentConnectFrame, Ready, Failed
}
public ClientCnx(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
super(30, TimeUnit.SECONDS);
this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), true);
this.authentication = conf.getAuthentication();
......@@ -196,14 +196,14 @@ public class ClientCnx extends PulsarHandler {
@Override
protected void handleConnected(CommandConnected connected) {
if (isTlsHostnameVerificationEnable && remoteHostName != null && !verifyTlsHostName(remoteHostName, ctx)) {
// close the connection if host-verification failed with the broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}
checkArgument(state == State.SentConnectFrame);
if (log.isDebugEnabled()) {
......@@ -593,9 +593,9 @@ public class ClientCnx extends PulsarHandler {
/**
* verifies host name provided in x509 Certificate in tls session
*
*
* it matches hostname with below scenarios
*
*
* <pre>
* 1. Supports IPV4 and IPV6 host matching
* 2. Supports wild card matching for DNS-name
......@@ -605,7 +605,7 @@ public class ClientCnx extends PulsarHandler {
* 2. localhost local* PASS
* 3. pulsar1-broker.com pulsar*.com PASS
* </pre>
*
*
* @param ctx
* @return true if hostname is verified else return false
*/
......@@ -648,7 +648,7 @@ public class ClientCnx extends PulsarHandler {
void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}
private PulsarClientException getPulsarClientException(ServerError error, String errorMsg) {
switch (error) {
case AuthenticationError:
......
......@@ -31,8 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
......@@ -68,7 +68,7 @@ public class ConnectionPool implements Closeable {
private static final int MaxMessageSize = 5 * 1024 * 1024;
public static final String TLS_HANDLER = "tls";
public ConnectionPool(ClientConfiguration conf, EventLoopGroup eventLoopGroup) {
public ConnectionPool(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup;
this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
......
......@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.client.impl;
import com.google.common.collect.Queues;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
......@@ -28,20 +27,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import com.google.common.collect.Queues;
public abstract class ConsumerBase extends HandlerBase implements Consumer {
enum ConsumerType {
......@@ -49,7 +51,7 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
}
protected final String subscription;
protected final ConsumerConfiguration conf;
protected final ConsumerConfigurationData conf;
protected final String consumerName;
protected final CompletableFuture<Consumer> subscribeFuture;
protected final MessageListener listener;
......@@ -59,11 +61,11 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
protected int maxReceiverQueueSize;
protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0 , TimeUnit.MILLISECONDS));
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, int receiverQueueSize,
ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, 0, TimeUnit.MILLISECONDS));
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = subscription;
this.subscription = conf.getSubscriptionName();
this.conf = conf;
this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
......@@ -93,9 +95,13 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
case Terminated:
throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
case Failed:
case Uninitialized:
throw new PulsarClientException.NotConnectedException();
default:
break;
}
return internalReceive();
......@@ -116,6 +122,8 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer already closed"));
case Terminated:
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Topic was terminated"));
case Failed:
case Uninitialized:
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
......@@ -146,6 +154,8 @@ public abstract class ConsumerBase extends HandlerBase implements Consumer {
case Closing:
case Closed:
throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
case Terminated:
throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
case Failed:
case Uninitialized:
throw new PulsarClientException.NotConnectedException();
......
......@@ -20,48 +20,45 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
import com.google.common.collect.Lists;
@SuppressWarnings("deprecation")
public class ConsumerBuilderImpl implements ConsumerBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
private String subscriptionName;
private final ConsumerConfiguration conf;
private Set<String> topicNames;
private final ConsumerConfigurationData conf;
private static long MIN_ACK_TIMEOUT_MILLIS = 1000;
ConsumerBuilderImpl(PulsarClientImpl client) {
this(client, new ConsumerConfigurationData());
}
private ConsumerBuilderImpl(PulsarClientImpl client, ConsumerConfigurationData conf) {
this.client = client;
this.conf = new ConsumerConfiguration();
this.conf = conf;
}
@Override
public ConsumerBuilder clone() {
try {
return (ConsumerBuilder) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ConsumerBuilderImpl");
}
return new ConsumerBuilderImpl(client, conf.clone());
}
@Override
......@@ -83,55 +80,45 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
@Override
public CompletableFuture<Consumer> subscribeAsync() {
if (topicNames == null || topicNames.isEmpty()) {
if (conf.getTopicNames().isEmpty()) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder"));
}
if (subscriptionName == null) {
if (conf.getSubscriptionName() == null) {
return FutureUtil.failedFuture(
new IllegalArgumentException("Subscription name must be set on the consumer builder"));
}
if (topicNames.size() == 1) {
return client.subscribeAsync(topicNames.stream().findFirst().orElse(""), subscriptionName, conf);
} else {
return client.subscribeAsync(topicNames, subscriptionName, conf);
}
return client.subscribeAsync(conf);
}
@Override
public ConsumerBuilder topic(String... topicNames) {
checkArgument(topicNames.length > 0, "Passed in topicNames should not be empty.");
if (this.topicNames == null) {
this.topicNames = Sets.newHashSet(topicNames);
} else {
this.topicNames.addAll(Lists.newArrayList(topicNames));
}
conf.getTopicNames().addAll(Lists.newArrayList(topicNames));
return this;
}
@Override
public ConsumerBuilder topics(List<String> topicNames) {
checkArgument(topicNames != null && !topicNames.isEmpty(),
"Passed in topicNames list should not be empty.");
if (this.topicNames == null) {
this.topicNames = Sets.newHashSet();
}
this.topicNames.addAll(topicNames);
checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
conf.getTopicNames().addAll(topicNames);
return this;
}
@Override
public ConsumerBuilder subscriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
conf.setSubscriptionName(subscriptionName);
return this;
}
@Override
public ConsumerBuilder ackTimeout(long ackTimeout, TimeUnit timeUnit) {
conf.setAckTimeout(ackTimeout, timeUnit);
checkArgument(timeUnit.toMillis(ackTimeout) >= MIN_ACK_TIMEOUT_MILLIS,
"Ack timeout should be should be greater than " + MIN_ACK_TIMEOUT_MILLIS + " ms");
conf.setAckTimeoutMillis(timeUnit.toMillis(ackTimeout));
return this;
}
......@@ -179,13 +166,13 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
@Override
public ConsumerBuilder property(String key, String value) {
conf.setProperty(key, value);
conf.getProperties().put(key, value);
return this;
}
@Override
public ConsumerBuilder properties(Map<String, String> properties) {
conf.setProperties(properties);
conf.getProperties().putAll(properties);
return this;
}
......
......@@ -25,11 +25,6 @@ import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
import static org.apache.pulsar.common.api.Commands.readChecksum;
import com.google.common.collect.Iterables;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
......@@ -52,13 +47,14 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
......@@ -74,6 +70,13 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterables;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timeout;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
public class ConsumerImpl extends ConsumerBase {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
......@@ -83,6 +86,7 @@ public class ConsumerImpl extends ConsumerBase {
// broker to notify that we are ready to get (and store in the incoming messages queue) more messages
private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ConsumerImpl.class, "availablePermits");
@SuppressWarnings("unused")
private volatile int availablePermits = 0;
private MessageId lastDequeuedMessage = MessageId.earliest;
......@@ -125,16 +129,15 @@ public class ConsumerImpl extends ConsumerBase {
NonDurable
}
ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture) {
this(client, topic, subscription, conf, listenerExecutor, partitionIndex, subscribeFuture,
SubscriptionMode.Durable, null);
this(client, topic, conf, listenerExecutor, partitionIndex, subscribeFuture, SubscriptionMode.Durable, null);
}
ConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf,
ExecutorService listenerExecutor, int partitionIndex, CompletableFuture<Consumer> subscribeFuture,
SubscriptionMode subscriptionMode, MessageId startMessageId) {
super(client, topic, subscription, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture);
this.consumerId = client.newConsumerId();
this.subscriptionMode = subscriptionMode;
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
......@@ -145,7 +148,7 @@ public class ConsumerImpl extends ConsumerBase {
this.codecProvider = new CompressionCodecProvider();
this.priorityLevel = conf.getPriorityLevel();
this.batchMessageAckTracker = new ConcurrentSkipListMap<>();
this.readCompacted = conf.getReadCompacted();
this.readCompacted = conf.isReadCompacted();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStats(client, conf, this);
......
......@@ -24,8 +24,8 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -74,7 +74,7 @@ public class ConsumerStats implements Serializable {
throughputFormat = null;
}
public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfiguration conf, ConsumerImpl consumer) {
public ConsumerStats(PulsarClientImpl pulsarClient, ConsumerConfigurationData conf, ConsumerImpl consumer) {
this.pulsarClient = pulsarClient;
this.consumer = consumer;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
......@@ -92,7 +92,7 @@ public class ConsumerStats implements Serializable {
init(conf);
}
private void init(ConsumerConfiguration conf) {
private void init(ConsumerConfigurationData conf) {
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
......
......@@ -28,25 +28,31 @@ import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.ssl.SslContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
import org.asynchttpclient.*;
import org.asynchttpclient.AsyncCompletionHandler;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.asynchttpclient.channel.DefaultKeepAliveStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.ssl.SslContext;
public class HttpClient implements Closeable {
protected final static int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
......@@ -131,7 +137,7 @@ public class HttpClient implements Closeable {
builder.setHeader(header.getKey(), header.getValue());
}
}
final ListenableFuture<Response> responseFuture = builder.setHeader("Accept", "application/json")
.execute(new AsyncCompletionHandler<Response>() {
......
......@@ -23,8 +23,8 @@ import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
......@@ -40,9 +40,9 @@ class HttpLookupService implements LookupService {
private final boolean useTls;
private static final String BasePath = "lookup/v2/destination/";
public HttpLookupService(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
this.httpClient = new HttpClient(serviceUrl, conf.getAuthentication(),
this.httpClient = new HttpClient(conf.getServiceUrl(), conf.getAuthentication(),
eventLoopGroup, conf.isTlsAllowInsecureConnection(), conf.getTlsTrustCertsFilePath());
this.useTls = conf.isUseTls();
}
......
......@@ -37,6 +37,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
......@@ -57,7 +58,6 @@ import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.common.api.proto.PulsarApi.EncryptionKeys;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.bouncycastle.asn1.ASN1ObjectIdentifier;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
import org.bouncycastle.asn1.x509.SubjectPublicKeyInfo;
......@@ -283,23 +283,21 @@ public class MessageCrypto {
* Encrypt data key using the public key(s) in the argument. <p> If more than one key name is specified, data key is
* encrypted using each of those keys. If the public key is expired or changed, application is responsible to remove
* the old key and add the new key <p>
*
*
* @param keyNames List of public keys to encrypt data key
*
*
* @param keyReader Implementation to read the key values
*
*
*/
public synchronized void addPublicKeyCipher(ConcurrentOpenHashSet<String> keyNames, CryptoKeyReader keyReader)
public synchronized void addPublicKeyCipher(Set<String> keyNames, CryptoKeyReader keyReader)
throws CryptoException {
// Generate data key
dataKey = keyGenerator.generateKey();
List<String> keyNameList = keyNames.values();
for (int i = 0; i < keyNameList.size(); i++) {
addPublicKeyCipher(keyNameList.get(i), keyReader);
for (String key : keyNames) {
addPublicKeyCipher(key, keyReader);
}
}
private void addPublicKeyCipher(String keyName, CryptoKeyReader keyReader) throws CryptoException {
......@@ -350,9 +348,9 @@ public class MessageCrypto {
/*
* Remove a key <p> Remove the key identified by the keyName from the list of keys.<p>
*
*
* @param keyName Unique name to identify the key
*
*
* @return true if succeeded, false otherwise
*/
/*
......@@ -368,16 +366,16 @@ public class MessageCrypto {
/*
* Encrypt the payload using the data key and update message metadata with the keyname & encrypted data key
*
*
* @param encKeys One or more public keys to encrypt data key
*
*
* @param msgMetadata Message Metadata
*
*
* @param payload Message which needs to be encrypted
*
*
* @return encryptedData if success
*/
public synchronized ByteBuf encrypt(ConcurrentOpenHashSet<String> encKeys, CryptoKeyReader keyReader,
public synchronized ByteBuf encrypt(Set<String> encKeys, CryptoKeyReader keyReader,
MessageMetadata.Builder msgMetadata, ByteBuf payload) throws PulsarClientException {
if (encKeys.isEmpty()) {
......@@ -385,9 +383,7 @@ public class MessageCrypto {
}
// Update message metadata with encrypted data key
List<String> keyNameList = encKeys.values();
for (int i = 0; i < keyNameList.size(); i++) {
String keyName = keyNameList.get(i);
for (String keyName : encKeys) {
if (encryptedDataKeyMap.get(keyName) == null) {
// Attempt to load the key. This will allow us to load keys as soon as
// a new key is added to producer config
......@@ -569,13 +565,13 @@ public class MessageCrypto {
/*
* Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata
*
*
* @param msgMetadata Message Metadata
*
*
* @param payload Message which needs to be decrypted
*
*
* @param keyReader KeyReader implementation to retrieve key value
*
*
* @return decryptedData if success, null otherwise
*/
public ByteBuf decrypt(MessageMetadata msgMetadata, ByteBuf payload, CryptoKeyReader keyReader) {
......
......@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.ProducerConfiguration;
public abstract class MessageRouterBase implements MessageRouter {
private static final long serialVersionUID = 1L;
protected final Hash hash;
MessageRouterBase(ProducerConfiguration.HashingScheme hashingScheme) {
MessageRouterBase(HashingScheme hashingScheme) {
switch (hashingScheme) {
case JavaStringHash:
this.hash = JavaStringHash.getInstance();
break;
case Murmur3_32Hash:
default:
this.hash = Murmur3_32Hash.getInstance();
case JavaStringHash:
this.hash = JavaStringHash.getInstance();
break;
case Murmur3_32Hash:
default:
this.hash = Murmur3_32Hash.getInstance();
}
}
}
......@@ -35,11 +35,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.FutureUtil;
......@@ -65,10 +65,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
private final ConsumerStats stats;
private final UnAckedMessageTracker unAckedMessageTracker;
PartitionedConsumerImpl(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
int numPartitions, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, topic, subscription, conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()), listenerExecutor,
subscribeFuture);
PartitionedConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, int numPartitions,
ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
super(client, conf.getSingleTopic(), conf, Math.max(Math.max(2, numPartitions), conf.getReceiverQueueSize()),
listenerExecutor, subscribeFuture);
this.consumers = Lists.newArrayListWithCapacity(numPartitions);
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
......@@ -89,10 +89,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
private void start() {
AtomicReference<Throwable> subscribeFail = new AtomicReference<Throwable>();
AtomicInteger completed = new AtomicInteger();
ConsumerConfiguration internalConfig = getInternalConsumerConfig();
ConsumerConfigurationData internalConfig = getInternalConsumerConfig();
for (int partitionIndex = 0; partitionIndex < numPartitions; partitionIndex++) {
String partitionName = DestinationName.get(topic).getPartition(partitionIndex).toString();
ConsumerImpl consumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
ConsumerImpl consumer = new ConsumerImpl(client, partitionName, internalConfig,
client.externalExecutorProvider().getExecutor(), partitionIndex, new CompletableFuture<Consumer>());
consumers.add(consumer);
consumer.subscribeFuture().handle((cons, subscribeException) -> {
......@@ -434,9 +434,10 @@ public class PartitionedConsumerImpl extends ConsumerBase {
return subscription;
}
private ConsumerConfiguration getInternalConsumerConfig() {
ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
private ConsumerConfigurationData getInternalConsumerConfig() {
ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
internalConsumerConfig.setSubscriptionName(conf.getSubscriptionName());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
if (null != conf.getConsumerEventListener()) {
......@@ -451,7 +452,7 @@ public class PartitionedConsumerImpl extends ConsumerBase {
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
}
if (conf.getAckTimeoutMillis() != 0) {
internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
}
return internalConsumerConfig;
......
......@@ -30,11 +30,11 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
......@@ -49,7 +49,7 @@ public class PartitionedProducerImpl extends ProducerBase {
private final ProducerStats stats;
private final TopicMetadata topicMetadata;
public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf, int numPartitions,
public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions,
CompletableFuture<Producer> producerCreatedFuture) {
super(client, topic, conf, producerCreatedFuture);
this.producers = Lists.newArrayListWithCapacity(numPartitions);
......@@ -67,7 +67,7 @@ public class PartitionedProducerImpl extends ProducerBase {
MessageRouter messageRouter;
MessageRoutingMode messageRouteMode = conf.getMessageRoutingMode();
MessageRouter customMessageRouter = conf.getMessageRouter();
MessageRouter customMessageRouter = conf.getCustomMessageRouter();
switch (messageRouteMode) {
case CustomPartition:
......
......@@ -26,17 +26,18 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
public abstract class ProducerBase extends HandlerBase implements Producer {
protected final CompletableFuture<Producer> producerCreatedFuture;
protected final ProducerConfiguration conf;
protected final ProducerConfigurationData conf;
protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfiguration conf,
protected ProducerBase(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer> producerCreatedFuture) {
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS, Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
super(client, topic, new Backoff(100, TimeUnit.MILLISECONDS, 60, TimeUnit.SECONDS,
Math.max(100, conf.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS));
this.producerCreatedFuture = producerCreatedFuture;
this.conf = conf;
}
......@@ -98,7 +99,7 @@ public abstract class ProducerBase extends HandlerBase implements Producer {
return topic;
}
public ProducerConfiguration getConfiguration() {
public ProducerConfigurationData getConfiguration() {
return conf;
}
......
......@@ -25,36 +25,35 @@ import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
@SuppressWarnings("deprecation")
public class ProducerBuilderImpl implements ProducerBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
private String topicName;
private final ProducerConfiguration conf;
private final ProducerConfigurationData conf;
ProducerBuilderImpl(PulsarClientImpl client) {
this(client, new ProducerConfigurationData());
}
private ProducerBuilderImpl(PulsarClientImpl client, ProducerConfigurationData conf) {
this.client = client;
this.conf = new ProducerConfiguration();
this.conf = conf;
}
@Override
public ProducerBuilder clone() {
try {
return (ProducerBuilder) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ProducerBuilderImpl");
}
return new ProducerBuilderImpl(client, conf.clone());
}
@Override
......@@ -76,17 +75,17 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public CompletableFuture<Producer> createAsync() {
if (topicName == null) {
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the producer builder"));
}
return client.createProducerAsync(topicName, conf);
return client.createProducerAsync(conf);
}
@Override
public ProducerBuilder topic(String topicName) {
this.topicName = topicName;
conf.setTopicName(topicName);
return this;
}
......@@ -98,7 +97,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder sendTimeout(int sendTimeout, TimeUnit unit) {
conf.setSendTimeout(sendTimeout, unit);
conf.setSendTimeoutMs(unit.toMillis(sendTimeout));
return this;
}
......@@ -122,7 +121,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder messageRoutingMode(MessageRoutingMode messageRouteMode) {
conf.setMessageRoutingMode(ProducerConfiguration.MessageRoutingMode.valueOf(messageRouteMode.toString()));
conf.setMessageRoutingMode(messageRouteMode);
return this;
}
......@@ -132,9 +131,15 @@ public class ProducerBuilderImpl implements ProducerBuilder {
return this;
}
@Override
public ProducerBuilder hashingScheme(HashingScheme hashingScheme) {
conf.setHashingScheme(hashingScheme);
return this;
}
@Override
public ProducerBuilder messageRouter(MessageRouter messageRouter) {
conf.setMessageRouter(messageRouter);
conf.setCustomMessageRouter(messageRouter);
return this;
}
......@@ -152,7 +157,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder addEncryptionKey(String key) {
conf.addEncryptionKey(key);
conf.getEncryptionKeys().add(key);
return this;
}
......@@ -164,7 +169,7 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder batchingMaxPublishDelay(long batchDelay, TimeUnit timeUnit) {
conf.setBatchingMaxPublishDelay(batchDelay, timeUnit);
conf.setBatchingMaxPublishDelayMicros(timeUnit.toMicros(batchDelay));
return this;
}
......@@ -182,13 +187,13 @@ public class ProducerBuilderImpl implements ProducerBuilder {
@Override
public ProducerBuilder property(String key, String value) {
conf.setProperty(key, value);
conf.getProperties().put(key, value);
return this;
}
@Override
public ProducerBuilder properties(Map<String, String> properties) {
conf.setProperties(properties);
conf.getProperties().putAll(properties);
return this;
}
}
......@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.String.format;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.checksum.utils.Crc32cChecksum.computeChecksum;
import static org.apache.pulsar.checksum.utils.Crc32cChecksum.resumeChecksum;
import static org.apache.pulsar.common.api.Commands.hasChecksum;
......@@ -42,13 +41,13 @@ import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.CryptoException;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.Commands.ChecksumType;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.PulsarDecoder;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
......@@ -107,7 +106,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater
.newUpdater(ProducerImpl.class, "msgIdGenerator");
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfiguration conf,
public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf,
CompletableFuture<Producer> producerCreatedFuture, int partitionIndex) {
super(client, topic, conf, producerCreatedFuture);
this.producerId = client.newProducerId();
......@@ -119,8 +118,8 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
this.compressor = CompressionCodecProvider
.getCompressionCodec(convertCompressionType(conf.getCompressionType()));
if (conf.getInitialSequenceId().isPresent()) {
long initialSequenceId = conf.getInitialSequenceId().get();
if (conf.getInitialSequenceId() != null) {
long initialSequenceId = conf.getInitialSequenceId();
this.lastSequenceIdPublished = initialSequenceId;
this.msgIdGenerator = initialSequenceId + 1;
} else {
......@@ -151,7 +150,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
this.createProducerTimeout = System.currentTimeMillis() + client.getConfiguration().getOperationTimeoutMs();
if (conf.getBatchingEnabled()) {
if (conf.isBatchingEnabled()) {
this.maxNumMessagesInBatch = conf.getBatchingMaxMessages();
this.batchMessageContainer = new BatchMessageContainer(maxNumMessagesInBatch,
convertCompressionType(conf.getCompressionType()), topic, producerName);
......@@ -175,7 +174,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
}
private boolean isBatchMessagingEnabled() {
return conf.getBatchingEnabled();
return conf.isBatchingEnabled();
}
@Override
......@@ -421,7 +420,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
private boolean canEnqueueRequest(SendCallback callback) {
try {
if (conf.getBlockIfQueueFull()) {
if (conf.isBlockIfQueueFull()) {
semaphore.acquire();
} else {
if (!semaphore.tryAcquire()) {
......@@ -843,15 +842,15 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
this.producerName = producerName;
}
if (this.lastSequenceIdPublished == -1 && !conf.getInitialSequenceId().isPresent()) {
if (this.lastSequenceIdPublished == -1 && conf.getInitialSequenceId() == null) {
this.lastSequenceIdPublished = lastSequenceId;
this.msgIdGenerator = lastSequenceId + 1;
}
if (!producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
// schedule the first batch message task
client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMs(),
TimeUnit.MILLISECONDS);
client.timer().newTimeout(batchMessageAndSendTask, conf.getBatchingMaxPublishDelayMicros(),
TimeUnit.MICROSECONDS);
}
resendMessages(cnx);
}
......@@ -1139,7 +1138,7 @@ public class ProducerImpl extends ProducerBase implements TimerTask {
batchMessageAndSend();
}
// schedule the next batch message task
client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMs(), TimeUnit.MILLISECONDS);
client.timer().newTimeout(this, conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
}
};
......
......@@ -24,7 +24,7 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -74,7 +74,7 @@ public class ProducerStats implements Serializable {
ds = null;
}
public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfiguration conf, ProducerImpl producer) {
public ProducerStats(PulsarClientImpl pulsarClient, ProducerConfigurationData conf, ProducerImpl producer) {
this.pulsarClient = pulsarClient;
this.statsIntervalSeconds = pulsarClient.getConfiguration().getStatsIntervalSeconds();
this.producer = producer;
......@@ -92,7 +92,7 @@ public class ProducerStats implements Serializable {
init(conf);
}
private void init(ProducerConfiguration conf) {
private void init(ProducerConfigurationData conf) {
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
......
......@@ -20,7 +20,6 @@ package org.apache.pulsar.client.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
......@@ -45,6 +44,10 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.naming.DestinationDomain;
import org.apache.pulsar.common.naming.DestinationName;
......@@ -67,7 +70,7 @@ public class PulsarClientImpl implements PulsarClient {
private static final Logger log = LoggerFactory.getLogger(PulsarClientImpl.class);
private final ClientConfiguration conf;
private final ClientConfigurationData conf;
private final LookupService lookup;
private final ConnectionPool cnxPool;
private final Timer timer;
......@@ -87,37 +90,53 @@ public class PulsarClientImpl implements PulsarClient {
private final EventLoopGroup eventLoopGroup;
@Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf) throws PulsarClientException {
this(serviceUrl, conf, getEventLoopGroup(conf));
this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone());
}
@Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup)
throws PulsarClientException {
this(serviceUrl, conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup);
}
@Deprecated
public PulsarClientImpl(String serviceUrl, ClientConfiguration conf, EventLoopGroup eventLoopGroup,
ConnectionPool cnxPool) throws PulsarClientException {
if (isBlank(serviceUrl) || conf == null || eventLoopGroup == null) {
this(conf.setServiceUrl(serviceUrl).getConfigurationData().clone(), eventLoopGroup, cnxPool);
}
public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
this(conf, getEventLoopGroup(conf));
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
this(conf, eventLoopGroup, new ConnectionPool(conf, eventLoopGroup));
}
public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
throws PulsarClientException {
if (conf == null || isBlank(conf.getServiceUrl()) || eventLoopGroup == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid client configuration");
}
this.eventLoopGroup = eventLoopGroup;
this.conf = conf;
conf.getAuthentication().start();
this.cnxPool = cnxPool;
if (serviceUrl.startsWith("http")) {
lookup = new HttpLookupService(serviceUrl, conf, eventLoopGroup);
if (conf.getServiceUrl().startsWith("http")) {
lookup = new HttpLookupService(conf, eventLoopGroup);
} else {
lookup = new BinaryProtoLookupService(this, serviceUrl, conf.isUseTls());
lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls());
}
timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
externalExecutorProvider = new ExecutorProvider(conf.getListenerThreads(), "pulsar-external-listener");
externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
producers = Maps.newIdentityHashMap();
consumers = Maps.newIdentityHashMap();
state.set(State.Open);
}
public ClientConfiguration getConfiguration() {
public ClientConfigurationData getConfiguration() {
return conf;
}
......@@ -137,9 +156,11 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
public Producer createProducer(String destination) throws PulsarClientException {
public Producer createProducer(String topic) throws PulsarClientException {
try {
return createProducerAsync(destination, new ProducerConfiguration()).get();
ProducerConfigurationData conf = new ProducerConfigurationData();
conf.setTopicName(topic);
return createProducerAsync(conf).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
......@@ -154,10 +175,15 @@ public class PulsarClientImpl implements PulsarClient {
}
@Override
public Producer createProducer(final String destination, final ProducerConfiguration conf)
throws PulsarClientException {
public Producer createProducer(final String topic, final ProducerConfiguration conf) throws PulsarClientException {
if (conf == null) {
throw new PulsarClientException.InvalidConfigurationException("Invalid null configuration object");
}
try {
return createProducerAsync(destination, conf).get();
ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
confData.setTopicName(topic);
return createProducerAsync(confData).get();
} catch (ExecutionException e) {
Throwable t = e.getCause();
if (t instanceof PulsarClientException) {
......@@ -173,22 +199,33 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<Producer> createProducerAsync(String topic) {
return createProducerAsync(topic, new ProducerConfiguration());
ProducerConfigurationData conf = new ProducerConfigurationData();
conf.setTopicName(topic);
return createProducerAsync(conf);
}
@Override
public CompletableFuture<Producer> createProducerAsync(final String topic, final ProducerConfiguration conf) {
ProducerConfigurationData confData = conf.getProducerConfigurationData().clone();
confData.setTopicName(topic);
return createProducerAsync(confData);
}
public CompletableFuture<Producer> createProducerAsync(ProducerConfigurationData conf) {
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
}
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
String topic = conf.getTopicName();
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Producer configuration undefined"));
}
CompletableFuture<Producer> producerCreatedFuture = new CompletableFuture<>();
......@@ -242,44 +279,70 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<Consumer> subscribeAsync(String topic, String subscription) {
return subscribeAsync(topic, subscription, new ConsumerConfiguration());
ConsumerConfigurationData conf = new ConsumerConfigurationData();
conf.getTopicNames().add(topic);
conf.setSubscriptionName(subscription);
return subscribeAsync(conf);
}
@Override
public CompletableFuture<Consumer> subscribeAsync(final String topic, final String subscription,
final ConsumerConfiguration conf) {
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Invalid null configuration"));
}
ConsumerConfigurationData confData = conf.getConfigurationData().clone();
confData.getTopicNames().add(topic);
confData.setSubscriptionName(subscription);
return subscribeAsync(confData);
}
public CompletableFuture<Consumer> subscribeAsync(ConsumerConfigurationData conf) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
if (!DestinationName.isValid(topic)) {
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
if (!conf.getTopicNames().stream().allMatch(topic -> DestinationName.isValid(topic))) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
if (isBlank(subscription)) {
if (isBlank(conf.getSubscriptionName())) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
}
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
if (conf.getReadCompacted()
&& (!DestinationName.get(topic).getDomain().equals(DestinationDomain.persistent)
|| (conf.getSubscriptionType() != SubscriptionType.Exclusive
if (conf.isReadCompacted() && (!conf.getTopicNames().stream()
.allMatch(topic -> DestinationName.get(topic).getDomain() == DestinationDomain.persistent)
|| (conf.getSubscriptionType() != SubscriptionType.Exclusive
&& conf.getSubscriptionType() != SubscriptionType.Failover))) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException(
"Read compacted can only be used with exclusive of failover persistent subscriptions"));
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Read compacted can only be used with exclusive of failover persistent subscriptions"));
}
if (conf.getConsumerEventListener() != null
&& conf.getSubscriptionType() != SubscriptionType.Failover) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException(
"Active consumer listener is only supported for failover subscription"));
if (conf.getConsumerEventListener() != null && conf.getSubscriptionType() != SubscriptionType.Failover) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Active consumer listener is only supported for failover subscription"));
}
if (conf.getTopicNames().size() == 1) {
return singleTopicSubscribeAsysnc(conf);
} else {
return multiTopicSubscribeAsync(conf);
}
}
private CompletableFuture<Consumer> singleTopicSubscribeAsysnc(ConsumerConfigurationData conf) {
CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
String topic = conf.getSingleTopic();
getPartitionedTopicMetadata(topic).thenAccept(metadata -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Received topic metadata. partitions: {}", topic, metadata.partitions);
......@@ -289,10 +352,10 @@ public class PulsarClientImpl implements PulsarClient {
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
if (metadata.partitions > 1) {
consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, topic, subscription, conf,
metadata.partitions, listenerThread, consumerSubscribedFuture);
consumer = new PartitionedConsumerImpl(PulsarClientImpl.this, conf, metadata.partitions, listenerThread,
consumerSubscribedFuture);
} else {
consumer = new ConsumerImpl(PulsarClientImpl.this, topic, subscription, conf, listenerThread, -1,
consumer = new ConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, -1,
consumerSubscribedFuture);
}
......@@ -308,31 +371,12 @@ public class PulsarClientImpl implements PulsarClient {
return consumerSubscribedFuture;
}
public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
String subscription,
ConsumerConfiguration conf) {
if (topics == null || topics.isEmpty()) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Empty topics name"));
}
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
if (isBlank(subscription)) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
}
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
private CompletableFuture<Consumer> multiTopicSubscribeAsync(ConsumerConfigurationData conf) {
CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, topics, subscription,
conf, externalExecutorProvider.getExecutor(),
consumerSubscribedFuture);
ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, conf,
externalExecutorProvider.getExecutor(), consumerSubscribedFuture);
synchronized (consumers) {
consumers.put(consumer, Boolean.TRUE);
}
......@@ -361,20 +405,32 @@ public class PulsarClientImpl implements PulsarClient {
@Override
public CompletableFuture<Reader> createReaderAsync(String topic, MessageId startMessageId,
ReaderConfiguration conf) {
ReaderConfigurationData confData = conf.getReaderConfigurationData().clone();
confData.setTopicName(topic);
confData.setStartMessageId(startMessageId);
return createReaderAsync(confData);
}
public CompletableFuture<Reader> createReaderAsync(ReaderConfigurationData conf) {
if (state.get() != State.Open) {
return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
}
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
String topic = conf.getTopicName();
if (!DestinationName.isValid(topic)) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Invalid topic name"));
}
if (startMessageId == null) {
if (conf.getStartMessageId() == null) {
return FutureUtil
.failedFuture(new PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
}
if (conf == null) {
return FutureUtil.failedFuture(
new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
}
CompletableFuture<Reader> readerFuture = new CompletableFuture<>();
......@@ -392,8 +448,7 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
// gets the next single threaded executor from the list of executors
ExecutorService listenerThread = externalExecutorProvider.getExecutor();
ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, topic, startMessageId, conf, listenerThread,
consumerSubscribedFuture);
ReaderImpl reader = new ReaderImpl(PulsarClientImpl.this, conf, listenerThread, consumerSubscribedFuture);
synchronized (consumers) {
consumers.put(reader.getConsumer(), Boolean.TRUE);
......@@ -535,10 +590,9 @@ public class PulsarClientImpl implements PulsarClient {
return metadataFuture;
}
private static EventLoopGroup getEventLoopGroup(ClientConfiguration conf) {
int numThreads = conf.getIoThreads();
private static EventLoopGroup getEventLoopGroup(ClientConfigurationData conf) {
ThreadFactory threadFactory = new DefaultThreadFactory("pulsar-client-io");
return EventLoopUtil.newEventLoopGroup(numThreads, threadFactory);
return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(), threadFactory);
}
void cleanupProducer(ProducerBase producer) {
......
......@@ -27,24 +27,25 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
import org.apache.pulsar.common.util.FutureUtil;
@SuppressWarnings("deprecation")
public class ReaderBuilderImpl implements ReaderBuilder {
private static final long serialVersionUID = 1L;
private final PulsarClientImpl client;
private final ReaderConfiguration conf;
private String topicName;
private MessageId startMessageId;
private final ReaderConfigurationData conf;
ReaderBuilderImpl(PulsarClientImpl client) {
this(client, new ReaderConfigurationData());
}
private ReaderBuilderImpl(PulsarClientImpl client, ReaderConfigurationData conf) {
this.client = client;
this.conf = new ReaderConfiguration();
this.conf = conf;
}
@Override
......@@ -75,28 +76,28 @@ public class ReaderBuilderImpl implements ReaderBuilder {
@Override
public CompletableFuture<Reader> createAsync() {
if (topicName == null) {
if (conf.getTopicName() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Topic name must be set on the reader builder"));
}
if (startMessageId == null) {
if (conf.getStartMessageId() == null) {
return FutureUtil
.failedFuture(new IllegalArgumentException("Start message id must be set on the reader builder"));
}
return client.createReaderAsync(topicName, startMessageId, conf);
return client.createReaderAsync(conf);
}
@Override
public ReaderBuilder topic(String topicName) {
this.topicName = topicName;
conf.setTopicName(topicName);
return this;
}
@Override
public ReaderBuilder startMessageId(MessageId startMessageId) {
this.startMessageId = startMessageId;
conf.setStartMessageId(startMessageId);
return this;
}
......
......@@ -23,30 +23,31 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderConfiguration;
import org.apache.pulsar.client.api.ReaderListener;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl.SubscriptionMode;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
public class ReaderImpl implements Reader {
private final ConsumerImpl consumer;
public ReaderImpl(PulsarClientImpl client, String topic, MessageId startMessageId,
ReaderConfiguration readerConfiguration, ExecutorService listenerExecutor,
CompletableFuture<Consumer> consumerFuture) {
public ReaderImpl(PulsarClientImpl client, ReaderConfigurationData readerConfiguration,
ExecutorService listenerExecutor, CompletableFuture<Consumer> consumerFuture) {
String subscription = "reader-" + DigestUtils.sha1Hex(UUID.randomUUID().toString()).substring(0, 10);
ConsumerConfiguration consumerConfiguration = new ConsumerConfiguration();
ConsumerConfigurationData consumerConfiguration = new ConsumerConfigurationData();
consumerConfiguration.getTopicNames().add(readerConfiguration.getTopicName());
consumerConfiguration.setSubscriptionName(subscription);
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
if (readerConfiguration.getReaderName() != null) {
......@@ -76,8 +77,8 @@ public class ReaderImpl implements Reader {
consumerConfiguration.setCryptoKeyReader(readerConfiguration.getCryptoKeyReader());
}
consumer = new ConsumerImpl(client, topic, subscription, consumerConfiguration, listenerExecutor, -1,
consumerFuture, SubscriptionMode.NonDurable, startMessageId);
consumer = new ConsumerImpl(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-1, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId());
}
@Override
......
......@@ -20,17 +20,20 @@ package org.apache.pulsar.client.impl;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.TopicMetadata;
public class RoundRobinPartitionMessageRouterImpl extends MessageRouterBase {
private static final long serialVersionUID = 1L;
private static final AtomicIntegerFieldUpdater<RoundRobinPartitionMessageRouterImpl> PARTITION_INDEX_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(RoundRobinPartitionMessageRouterImpl.class, "partitionIndex");
@SuppressWarnings("unused")
private volatile int partitionIndex = 0;
public RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme hashingScheme) {
public RoundRobinPartitionMessageRouterImpl(HashingScheme hashingScheme) {
super(hashingScheme);
PARTITION_INDEX_UPDATER.set(this, 0);
}
......
......@@ -18,15 +18,17 @@
*/
package org.apache.pulsar.client.impl;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.TopicMetadata;
public class SinglePartitionMessageRouterImpl extends MessageRouterBase {
private static final long serialVersionUID = 1L;
private final int partitionIndex;
public SinglePartitionMessageRouterImpl(int partitionIndex, ProducerConfiguration.HashingScheme hashingScheme) {
public SinglePartitionMessageRouterImpl(int partitionIndex, HashingScheme hashingScheme) {
super(hashingScheme);
this.partitionIndex = partitionIndex;
}
......
......@@ -21,7 +21,6 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import com.google.common.collect.Lists;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
......@@ -37,15 +36,15 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.DestinationName;
......@@ -54,6 +53,8 @@ import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Lists;
public class TopicsConsumerImpl extends ConsumerBase {
// All topics should be in same namespace
......@@ -79,14 +80,12 @@ public class TopicsConsumerImpl extends ConsumerBase {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStats stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfiguration internalConfig;
private final ConsumerConfigurationData internalConfig;
TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics, String subscription,
ConsumerConfiguration conf, ExecutorService listenerExecutor,
CompletableFuture<Consumer> subscribeFuture) {
super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription,
conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
subscribeFuture);
TopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData conf, ExecutorService listenerExecutor,
CompletableFuture<Consumer> subscribeFuture) {
super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), conf,
Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
......@@ -106,23 +105,19 @@ public class TopicsConsumerImpl extends ConsumerBase {
this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
if (topics.isEmpty()) {
if (conf.getTopicNames().isEmpty()) {
this.namespaceName = null;
setState(State.Ready);
subscribeFuture().complete(TopicsConsumerImpl.this);
return;
}
checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics should have same namespace.");
this.namespaceName = topics.stream().findFirst().flatMap(
new Function<String, Optional<NamespaceName>>() {
@Override
public Optional<NamespaceName> apply(String s) {
return Optional.of(DestinationName.get(s).getNamespaceObject());
}
}).get();
checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics should have same namespace.");
this.namespaceName = conf.getTopicNames().stream().findFirst()
.flatMap(s -> Optional.of(DestinationName.get(s).getNamespaceObject())).get();
List<CompletableFuture<Void>> futures = topics.stream().map(t -> subscribeAsync(t)).collect(Collectors.toList());
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t))
.collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
try {
......@@ -490,8 +485,9 @@ public class TopicsConsumerImpl extends ConsumerBase {
return subscription;
}
private ConsumerConfiguration getInternalConsumerConfig() {
ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
private ConsumerConfigurationData getInternalConsumerConfig() {
ConsumerConfigurationData internalConsumerConfig = new ConsumerConfigurationData();
internalConsumerConfig.setSubscriptionName(subscription);
internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
internalConsumerConfig.setConsumerName(consumerName);
......@@ -500,7 +496,7 @@ public class TopicsConsumerImpl extends ConsumerBase {
internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
}
if (conf.getAckTimeoutMillis() != 0) {
internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
internalConsumerConfig.setAckTimeoutMillis(conf.getAckTimeoutMillis());
}
return internalConsumerConfig;
......@@ -658,9 +654,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
partitionIndex -> {
String partitionName = DestinationName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
client.externalExecutorProvider().getExecutor(), partitionIndex,
subFuture);
ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, internalConfig,
client.externalExecutorProvider().getExecutor(), partitionIndex, subFuture);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
......@@ -671,9 +666,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
partitionNumber.incrementAndGet();
CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, subscription, internalConfig,
client.externalExecutorProvider().getExecutor(), 0,
subFuture);
ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), 0, subFuture);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Lists.newArrayList(subFuture);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import java.io.Serializable;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
/**
* This is a simple holder of the client configuration values.
*/
@Data
public class ClientConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
private String serviceUrl;
@JsonIgnore
private Authentication authentication = new AuthenticationDisabled();
private long operationTimeoutMs = 30000;
private long statsIntervalSeconds = 60;
private int numIoThreads = 1;
private int numListenerThreads = 1;
private int connectionsPerBroker = 1;
private boolean useTcpNoDelay = true;
private boolean useTls = false;
private String tlsTrustCertsFilePath = "";
private boolean tlsAllowInsecureConnection = false;
private boolean tlsHostnameVerificationEnable = false;
private int concurrentLookupRequest = 50000;
private int maxNumberOfRejectedRequestPerConnection = 50;
public ClientConfigurationData clone() {
try {
return (ClientConfigurationData) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ClientConfigurationData");
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.SubscriptionType;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
@Data
public class ConsumerConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
private Set<String> topicNames = Sets.newTreeSet();
private String subscriptionName;
private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
@JsonIgnore
private MessageListener messageListener;
@JsonIgnore
private ConsumerEventListener consumerEventListener;
private int receiverQueueSize = 1000;
private int maxTotalReceiverQueueSizeAcrossPartitions = 50000;
private String consumerName = null;
private long ackTimeoutMillis = 0;
private int priorityLevel = 0;
@JsonIgnore
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
private SortedMap<String, String> properties = new TreeMap<>();
private boolean readCompacted = false;
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
return topicNames.iterator().next();
}
public ConsumerConfigurationData clone() {
try {
ConsumerConfigurationData c = (ConsumerConfigurationData) super.clone();
c.topicNames = Sets.newTreeSet(this.topicNames);
c.properties = Maps.newTreeMap(this.properties);
return c;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ConsumerConfigurationData");
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import java.io.Serializable;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import lombok.Data;
@Data
public class ProducerConfigurationData implements Serializable, Cloneable {
private static final long serialVersionUID = 1L;
private String topicName = null;
private String producerName = null;
private long sendTimeoutMs = 30000;
private boolean blockIfQueueFull = false;
private int maxPendingMessages = 1000;
private int maxPendingMessagesAcrossPartitions = 50000;
private MessageRoutingMode messageRoutingMode = MessageRoutingMode.SinglePartition;
private HashingScheme hashingScheme = HashingScheme.JavaStringHash;
private ProducerCryptoFailureAction cryptoFailureAction = ProducerCryptoFailureAction.FAIL;
@JsonIgnore
private MessageRouter customMessageRouter = null;
private long batchingMaxPublishDelayMicros = TimeUnit.MILLISECONDS.toMicros(10);
private int batchingMaxMessages = 1000;
private boolean batchingEnabled = false; // disabled by default
@JsonIgnore
private CryptoKeyReader cryptoKeyReader;
@JsonIgnore
private Set<String> encryptionKeys = new TreeSet<>();
private CompressionType compressionType = CompressionType.NONE;
// Cannot use Optional<Long> since it's not serializable
private Long initialSequenceId = null;
private SortedMap<String, String> properties = new TreeMap<>();
/**
*
* Returns true if encryption keys are added
*
*/
public boolean isEncryptionEnabled() {
return (this.encryptionKeys != null) && !this.encryptionKeys.isEmpty() && (this.cryptoKeyReader != null);
}
public ProducerConfigurationData clone() {
try {
ProducerConfigurationData c = (ProducerConfigurationData) super.clone();
c.encryptionKeys = Sets.newTreeSet(this.encryptionKeys);
c.properties = Maps.newTreeMap(this.properties);
return c;
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ProducerConfigurationData", e);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl.conf;
import java.io.Serializable;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ReaderListener;
import lombok.Data;
@Data
public class ReaderConfigurationData implements Serializable, Cloneable {
private String topicName;
private MessageId startMessageId;
private int receiverQueueSize = 1000;
private ReaderListener readerListener;
private String readerName = null;
private CryptoKeyReader cryptoKeyReader = null;
private ConsumerCryptoFailureAction cryptoFailureAction = ConsumerCryptoFailureAction.FAIL;
public ReaderConfigurationData clone() {
try {
return (ReaderConfigurationData) super.clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException("Failed to clone ReaderConfigurationData");
}
}
}
......@@ -21,13 +21,15 @@ package org.apache.pulsar.client.api;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
/**
* Unit test of {@link ConsumerConfiguration}.
*/
......@@ -38,20 +40,22 @@ public class ConsumerConfigurationTest {
@Test
public void testJsonIgnore() throws Exception {
ConsumerConfiguration conf = new ConsumerConfiguration()
.setConsumerEventListener(new ConsumerEventListener() {
ConsumerConfigurationData conf = new ConsumerConfigurationData();
conf.setConsumerEventListener(new ConsumerEventListener() {
@Override
public void becameActive(Consumer consumer, int partitionId) {
}
@Override
public void becameInactive(Consumer consumer, int partitionId) {
}
});
@Override
public void becameActive(Consumer consumer, int partitionId) {
}
conf.setMessageListener((MessageListener) (consumer, msg) -> {
});
@Override
public void becameInactive(Consumer consumer, int partitionId) {
}
})
.setMessageListener((MessageListener) (consumer, msg) -> {
})
.setCryptoKeyReader(mock(CryptoKeyReader.class));
conf.setCryptoKeyReader(mock(CryptoKeyReader.class));
ObjectMapper m = new ObjectMapper();
m.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
......
......@@ -24,7 +24,6 @@ import static org.testng.Assert.assertTrue;
import org.apache.pulsar.client.api.PulsarClient;
import org.testng.annotations.Test;
@SuppressWarnings("deprecation")
public class BuildersTest {
@Test
......@@ -33,15 +32,15 @@ public class BuildersTest {
.maxNumberOfRejectedRequestPerConnection(200).serviceUrl("pulsar://service:6650");
assertEquals(clientBuilder.conf.isUseTls(), true);
assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
ClientBuilderImpl b2 = (ClientBuilderImpl) clientBuilder.clone();
assertTrue(b2 != clientBuilder);
b2.serviceUrl("pulsar://other-broker:6650");
assertEquals(clientBuilder.serviceUrl, "pulsar://service:6650");
assertEquals(b2.serviceUrl, "pulsar://other-broker:6650");
assertEquals(clientBuilder.conf.getServiceUrl(), "pulsar://service:6650");
assertEquals(b2.conf.getServiceUrl(), "pulsar://other-broker:6650");
}
}
......@@ -24,10 +24,9 @@ import static org.mockito.Mockito.when;
import static org.testng.Assert.assertTrue;
import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.testng.annotations.Test;
......@@ -42,8 +41,8 @@ public class ClientCnxTest {
@Test
public void testClientCnxTimeout() throws Exception {
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(1, new DefaultThreadFactory("testClientCnxTimeout"));
ClientConfiguration conf = new ClientConfiguration();
conf.setOperationTimeout(10, TimeUnit.MILLISECONDS);
ClientConfigurationData conf = new ClientConfigurationData();
conf.setOperationTimeoutMs(10);
ClientCnx cnx = new ClientCnx(conf, eventLoop);
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
......
......@@ -22,8 +22,8 @@ import static org.mockito.Mockito.mock;
import static org.powermock.api.mockito.PowerMockito.when;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.testng.annotations.Test;
/**
......@@ -36,7 +36,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
Message msg = mock(Message.class);
when(msg.getKey()).thenReturn(null);
RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
for (int i = 0; i < 10; i++) {
assertEquals(i % 5, router.choosePartition(msg, new TopicMetadataImpl(5)));
}
......@@ -53,7 +53,7 @@ public class RoundRobinPartitionMessageRouterImplTest {
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(ProducerConfiguration.HashingScheme.JavaStringHash);
RoundRobinPartitionMessageRouterImpl router = new RoundRobinPartitionMessageRouterImpl(HashingScheme.JavaStringHash);
TopicMetadataImpl metadata = new TopicMetadataImpl(100);
assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));
......
......@@ -22,8 +22,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import org.apache.pulsar.client.api.HashingScheme;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.testng.annotations.Test;
/**
......@@ -36,7 +36,7 @@ public class SinglePartitionMessageRouterImplTest {
Message msg = mock(Message.class);
when(msg.getKey()).thenReturn(null);
SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
assertEquals(1234, router.choosePartition(msg, new TopicMetadataImpl(2468)));
}
......@@ -51,7 +51,7 @@ public class SinglePartitionMessageRouterImplTest {
when(msg2.hasKey()).thenReturn(true);
when(msg2.getKey()).thenReturn(key2);
SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, ProducerConfiguration.HashingScheme.JavaStringHash);
SinglePartitionMessageRouterImpl router = new SinglePartitionMessageRouterImpl(1234, HashingScheme.JavaStringHash);
TopicMetadataImpl metadata = new TopicMetadataImpl(100);
assertEquals(key1.hashCode() % 100, router.choosePartition(msg1, metadata));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册