diff --git a/pulsar-client-cpp/include/pulsar/Client.h b/pulsar-client-cpp/include/pulsar/Client.h index 854948c1850fad74b621dcc2d9d8af7e0924555d..6989e9e48ae993ba25107a21ac1330ac7ef3b0a1 100644 --- a/pulsar-client-cpp/include/pulsar/Client.h +++ b/pulsar-client-cpp/include/pulsar/Client.h @@ -17,36 +17,14 @@ #ifndef PULSAR_CLIENT_HPP_ #define PULSAR_CLIENT_HPP_ -#include #include #include #include #include #include +#include #include -#ifdef PULSAR_ENABLE_DEPRECATED_METHOD - #include -#else - // Deprecated - namespace pulsar { - class AuthData; - typedef boost::shared_ptr AuthDataPtr; - class AuthData { - public: - static AuthenticationPtr getAuthenticationPtr(const AuthDataPtr& authentication) { - AuthenticationPtr ptr; - return ptr; - } - }; - class Auth { - public: - static AuthDataPtr Disabled() { - return AuthDataPtr(); - } - }; - } -#endif #pragma GCC visibility push(default) class PulsarFriend; @@ -57,126 +35,6 @@ typedef boost::function CreateProducerCallback; typedef boost::function SubscribeCallback; typedef boost::function CloseCallback; -class ClientConfiguration { - public: - - ClientConfiguration(); - ~ClientConfiguration(); - ClientConfiguration(const ClientConfiguration&); - ClientConfiguration& operator=(const ClientConfiguration&); - - /** - * @deprecated - * Set the authentication method to be used with the broker - * - * @param authentication the authentication data to use - */ - ClientConfiguration& setAuthentication(const AuthDataPtr& authentication); - - /** - * @deprecated - * @return the authentication data - */ - const AuthData& getAuthentication() const; - - /** - * Set the authentication method to be used with the broker - * - * @param authentication the authentication data to use - */ - ClientConfiguration& setAuth(const AuthenticationPtr& authentication); - - /** - * @return the authentication data - */ - const Authentication& getAuth() const; - - /** - * Set timeout on client operations (subscribe, create producer, close, unsubscribe) - * Default is 30 seconds. - * - * @param timeout the timeout after which the operation will be considered as failed - */ - ClientConfiguration& setOperationTimeoutSeconds(int timeout); - - /** - * @return the client operations timeout in seconds - */ - int getOperationTimeoutSeconds() const; - - /** - * Set the number of IO threads to be used by the Pulsar client. Default is 1 - * thread. - * - * @param threads number of threads - */ - ClientConfiguration& setIOThreads(int threads); - - /** - * @return the number of IO threads to use - */ - int getIOThreads() const; - - /** - * Set the number of threads to be used by the Pulsar client when delivering messages - * through message listener. Default is 1 thread per Pulsar client. - * - * If using more than 1 thread, messages for distinct MessageListener will be - * delivered in different threads, however a single MessageListener will always - * be assigned to the same thread. - * - * @param threads number of threads - */ - ClientConfiguration& setMessageListenerThreads(int threads); - - /** - * @return the number of IO threads to use - */ - int getMessageListenerThreads() const; - - /** - * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. - * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on - * thousands of topic using created {@link PulsarClient} - * - * @param concurrentLookupRequest - */ - ClientConfiguration& setConcurrentLookupRequest(int concurrentLookupRequest); - - /** - * @return Get configured total allowed concurrent lookup-request. - */ - int getConcurrentLookupRequest() const; - - /** - * Initialize the log configuration - * - * @param logConfFilePath path of the configuration file - */ - ClientConfiguration& setLogConfFilePath(const std::string& logConfFilePath); - - /** - * Get the path of log configuration file (log4cpp) - */ - const std::string& getLogConfFilePath() const; - - ClientConfiguration& setUseTls(bool useTls); - bool isUseTls() const; - - ClientConfiguration& setTlsTrustCertsFilePath(const std::string &tlsTrustCertsFilePath); - std::string getTlsTrustCertsFilePath() const; - - ClientConfiguration& setTlsAllowInsecureConnection(bool allowInsecure); - bool isTlsAllowInsecureConnection() const; - - private: - const AuthenticationPtr& getAuthenticationPtr() const; - - struct Impl; - boost::shared_ptr impl_; - friend class ClientImpl; -}; - class ClientImpl; class Client { diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h new file mode 100644 index 0000000000000000000000000000000000000000..43f79bd134e6d7dc8e8f32046c1e6a289198f8bd --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h @@ -0,0 +1,130 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CLIENTCONFIGURATION_H_ +#define PULSAR_CLIENTCONFIGURATION_H_ + +#include + +namespace pulsar { +class ClientConfigurationImpl; +class ClientConfiguration { + public: + + ClientConfiguration(); + ~ClientConfiguration(); + ClientConfiguration(const ClientConfiguration&); + ClientConfiguration& operator=(const ClientConfiguration&); + + /** + * Set the authentication method to be used with the broker + * + * @param authentication the authentication data to use + */ + ClientConfiguration& setAuth(const AuthenticationPtr& authentication); + + /** + * @return the authentication data + */ + const Authentication& getAuth() const; + + /** + * Set timeout on client operations (subscribe, create producer, close, unsubscribe) + * Default is 30 seconds. + * + * @param timeout the timeout after which the operation will be considered as failed + */ + ClientConfiguration& setOperationTimeoutSeconds(int timeout); + + /** + * @return the client operations timeout in seconds + */ + int getOperationTimeoutSeconds() const; + + /** + * Set the number of IO threads to be used by the Pulsar client. Default is 1 + * thread. + * + * @param threads number of threads + */ + ClientConfiguration& setIOThreads(int threads); + + /** + * @return the number of IO threads to use + */ + int getIOThreads() const; + + /** + * Set the number of threads to be used by the Pulsar client when delivering messages + * through message listener. Default is 1 thread per Pulsar client. + * + * If using more than 1 thread, messages for distinct MessageListener will be + * delivered in different threads, however a single MessageListener will always + * be assigned to the same thread. + * + * @param threads number of threads + */ + ClientConfiguration& setMessageListenerThreads(int threads); + + /** + * @return the number of IO threads to use + */ + int getMessageListenerThreads() const; + + /** + * Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker. + * (default: 5000) It should be configured with higher value only in case of it requires to produce/subscribe on + * thousands of topic using created {@link PulsarClient} + * + * @param concurrentLookupRequest + */ + ClientConfiguration& setConcurrentLookupRequest(int concurrentLookupRequest); + + /** + * @return Get configured total allowed concurrent lookup-request. + */ + int getConcurrentLookupRequest() const; + + /** + * Initialize the log configuration + * + * @param logConfFilePath path of the configuration file + */ + ClientConfiguration& setLogConfFilePath(const std::string& logConfFilePath); + + /** + * Get the path of log configuration file (log4cpp) + */ + const std::string& getLogConfFilePath() const; + + ClientConfiguration& setUseTls(bool useTls); + bool isUseTls() const; + + ClientConfiguration& setTlsTrustCertsFilePath(const std::string &tlsTrustCertsFilePath); + std::string getTlsTrustCertsFilePath() const; + + ClientConfiguration& setTlsAllowInsecureConnection(bool allowInsecure); + bool isTlsAllowInsecureConnection() const; + + private: + const AuthenticationPtr& getAuthenticationPtr() const; + boost::shared_ptr impl_; + friend class ClientImpl; +}; +} + +#endif /* PULSAR_CLIENTCONFIGURATION_H_ */ + diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/include/pulsar/CompressionType.h new file mode 100644 index 0000000000000000000000000000000000000000..6250d91ab49b7b93c52abdd057921d94bb25dd5a --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/CompressionType.h @@ -0,0 +1,27 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_COMPRESSIONTYPE_H_ +#define PULSAR_COMPRESSIONTYPE_H_ +namespace pulsar { +enum CompressionType { + CompressionNone = 0, + CompressionLZ4 = 1, + CompressionZLib = 2 +}; +} + +#endif /* PULSAR_COMPRESSIONTYPE_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h index f349175d802cf27d50126ac7790510f2bc987758..44771966ece2c3f07db00b47fd410b446a921f9e 100644 --- a/pulsar-client-cpp/include/pulsar/Consumer.h +++ b/pulsar-client-cpp/include/pulsar/Consumer.h @@ -17,116 +17,16 @@ #ifndef CONSUMER_HPP_ #define CONSUMER_HPP_ -#include -#include -#include -#include #include #include -#include #include +#include #pragma GCC visibility push(default) class PulsarFriend; namespace pulsar { -class Consumer; - -/// Callback definition for non-data operation -typedef boost::function ResultCallback; - -/// Callback definition for MessageListener -typedef boost::function MessageListener; - -/** - * Class specifying the configuration of a consumer. - */ -class ConsumerConfiguration { - public: - ConsumerConfiguration(); - ~ConsumerConfiguration(); - ConsumerConfiguration(const ConsumerConfiguration&); - ConsumerConfiguration& operator=(const ConsumerConfiguration&); - - /** - * Specify the consumer type. The consumer type enables - * specifying the type of subscription. In Exclusive subscription, - * only a single consumer is allowed to attach to the subscription. Other consumers - * will get an error message. In Shared subscription, multiple consumers will be - * able to use the same subscription name and the messages will be dispatched in a - * round robin fashion. In Failover subscription, a master-slave subscription model - * allows for multiple consumers to attach to a single subscription, though only one - * of them will be “master” at a given time. Only the master consumer will receive - * messages. When the master gets disconnected, one among the slaves will be promoted - * to master and will start getting messages. - */ - ConsumerConfiguration& setConsumerType(ConsumerType consumerType); - ConsumerType getConsumerType() const; - - /** - * A message listener enables your application to configure how to process - * and acknowledge messages delivered. A listener will be called in order - * for every message received. - */ - ConsumerConfiguration& setMessageListener(MessageListener messageListener); - MessageListener getMessageListener() const; - bool hasMessageListener() const; - - /** - * Sets the size of the consumer receive queue. - * - * The consumer receive queue controls how many messages can be accumulated by the Consumer before the - * application calls receive(). Using a higher value could potentially increase the consumer throughput - * at the expense of bigger memory utilization. - * - * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of - * messages. This approach improves the message distribution on shared subscription, by pushing messages only to - * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be - * used if the consumer queue size is zero. The receive() function call should not be interrupted when - * the consumer queue size is zero. - * - * Default value is 1000 messages and should be good for most use cases. - * - * @param size - * the new receiver queue size value - */ - void setReceiverQueueSize(int size); - int getReceiverQueueSize() const; - - void setConsumerName(const std::string&); - const std::string& getConsumerName() const; - - /** - * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than - * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). - * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are - * redelivered. - * @param timeout in milliseconds - */ - void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds); - - /** - * @return the configured timeout in milliseconds for unacked messages. - */ - long getUnAckedMessagesTimeoutMs() const; - - /** - * Set the time duration for which the broker side consumer stats will be cached in the client. - * @param cacheTimeInMs in milliseconds - */ - void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs); - - /** - * @return the configured timeout in milliseconds caching BrokerConsumerStats. - */ - long getBrokerConsumerStatsCacheTimeInMs() const; - - private: - struct Impl; - boost::shared_ptr impl_; -}; - class ConsumerImplBase; /** diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h new file mode 100644 index 0000000000000000000000000000000000000000..ab403df9de8cb3de8fb8deaee23bcf67d659d5c4 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -0,0 +1,128 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_CONSUMERCONFIGURATION_H_ +#define PULSAR_CONSUMERCONFIGURATION_H_ + +#include +#include +#include +#include +#include + +namespace pulsar { + +class Consumer; + +/// Callback definition for non-data operation +typedef boost::function ResultCallback; + +/// Callback definition for MessageListener +typedef boost::function MessageListener; + +class ConsumerConfigurationImpl; + +/** + * Class specifying the configuration of a consumer. + */ +class ConsumerConfiguration { + public: + ConsumerConfiguration(); + ~ConsumerConfiguration(); + ConsumerConfiguration(const ConsumerConfiguration&); + ConsumerConfiguration& operator=(const ConsumerConfiguration&); + + /** + * Specify the consumer type. The consumer type enables + * specifying the type of subscription. In Exclusive subscription, + * only a single consumer is allowed to attach to the subscription. Other consumers + * will get an error message. In Shared subscription, multiple consumers will be + * able to use the same subscription name and the messages will be dispatched in a + * round robin fashion. In Failover subscription, a master-slave subscription model + * allows for multiple consumers to attach to a single subscription, though only one + * of them will be “master” at a given time. Only the master consumer will receive + * messages. When the master gets disconnected, one among the slaves will be promoted + * to master and will start getting messages. + */ + ConsumerConfiguration& setConsumerType(ConsumerType consumerType); + ConsumerType getConsumerType() const; + + /** + * A message listener enables your application to configure how to process + * and acknowledge messages delivered. A listener will be called in order + * for every message received. + */ + ConsumerConfiguration& setMessageListener(MessageListener messageListener); + MessageListener getMessageListener() const; + bool hasMessageListener() const; + + /** + * Sets the size of the consumer receive queue. + * + * The consumer receive queue controls how many messages can be accumulated by the Consumer before the + * application calls receive(). Using a higher value could potentially increase the consumer throughput + * at the expense of bigger memory utilization. + * + * Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of + * messages. This approach improves the message distribution on shared subscription, by pushing messages only to + * the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be + * used if the consumer queue size is zero. The receive() function call should not be interrupted when + * the consumer queue size is zero. + * + * Default value is 1000 messages and should be good for most use cases. + * + * @param size + * the new receiver queue size value + */ + void setReceiverQueueSize(int size); + int getReceiverQueueSize() const; + + void setConsumerName(const std::string&); + const std::string& getConsumerName() const; + + /** + * Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than + * 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds). + * If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are + * redelivered. + * @param timeout in milliseconds + */ + void setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds); + + /** + * @return the configured timeout in milliseconds for unacked messages. + */ + long getUnAckedMessagesTimeoutMs() const; + + /** + * Set the time duration for which the broker side consumer stats will be cached in the client. + * @param cacheTimeInMs in milliseconds + */ + void setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs); + + /** + * @return the configured timeout in milliseconds caching BrokerConsumerStats. + */ + long getBrokerConsumerStatsCacheTimeInMs() const; + + private: + boost::shared_ptr impl_; +}; + +} + +#endif /* PULSAR_CONSUMERCONFIGURATION_H_ */ + diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index 1e86b553d0393d69d3512e4db86d361321bdffb2..a3f74325f46479a34e0b42176cec87a4274ef5fc 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -17,11 +17,7 @@ #ifndef PRODUCER_HPP_ #define PRODUCER_HPP_ -#include -#include -#include - -#include +#include #include #include @@ -30,66 +26,6 @@ class PulsarFriend; namespace pulsar { - -typedef boost::function SendCallback; -typedef boost::function CloseCallback; - -enum CompressionType { - CompressionNone = 0, - CompressionLZ4 = 1, - CompressionZLib = 2 -}; - -/** - * Class that holds the configuration for a producer - */ -class ProducerConfiguration { - public: - enum PartitionsRoutingMode { - UseSinglePartition, - RoundRobinDistribution, - CustomPartition - }; - ProducerConfiguration(); - ~ProducerConfiguration(); - ProducerConfiguration(const ProducerConfiguration&); - ProducerConfiguration& operator=(const ProducerConfiguration&); - - ProducerConfiguration& setSendTimeout(int sendTimeoutMs); - int getSendTimeout() const; - - ProducerConfiguration& setCompressionType(CompressionType compressionType); - CompressionType getCompressionType() const; - - ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages); - int getMaxPendingMessages() const; - - ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode); - PartitionsRoutingMode getPartitionsRoutingMode() const; - - ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router); - const MessageRoutingPolicyPtr& getMessageRouterPtr() const; - - ProducerConfiguration& setBlockIfQueueFull(bool); - bool getBlockIfQueueFull() const; - - // Zero queue size feature will not be supported on consumer end if batching is enabled - ProducerConfiguration& setBatchingEnabled(const bool& batchingEnabled); - const bool& getBatchingEnabled() const; - - ProducerConfiguration& setBatchingMaxMessages(const unsigned int& batchingMaxMessages); - const unsigned int& getBatchingMaxMessages() const; - - ProducerConfiguration& setBatchingMaxAllowedSizeInBytes(const unsigned long& batchingMaxAllowedSizeInBytes); - const unsigned long& getBatchingMaxAllowedSizeInBytes() const; - - ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs); - const unsigned long& getBatchingMaxPublishDelayMs() const; - private: - struct Impl; - boost::shared_ptr impl_; -}; - class ProducerImplBase; class Producer { diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h new file mode 100644 index 0000000000000000000000000000000000000000..3bb20de8b212443c21c5509554b218c48a315ac8 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -0,0 +1,85 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PULSAR_PRODUCERCONFIGURATION_H_ +#define PULSAR_PRODUCERCONFIGURATION_H_ +#include +#include +#include +#include +#include + +namespace pulsar { + +typedef boost::function SendCallback; +typedef boost::function CloseCallback; + +class ProducerConfigurationImpl; + +/** + * Class that holds the configuration for a producer + */ +class ProducerConfiguration { + public: + enum PartitionsRoutingMode { + UseSinglePartition, + RoundRobinDistribution, + CustomPartition + }; + ProducerConfiguration(); + ~ProducerConfiguration(); + ProducerConfiguration(const ProducerConfiguration&); + ProducerConfiguration& operator=(const ProducerConfiguration&); + + ProducerConfiguration& setSendTimeout(int sendTimeoutMs); + int getSendTimeout() const; + + ProducerConfiguration& setCompressionType(CompressionType compressionType); + CompressionType getCompressionType() const; + + ProducerConfiguration& setMaxPendingMessages(int maxPendingMessages); + int getMaxPendingMessages() const; + + ProducerConfiguration& setPartitionsRoutingMode(const PartitionsRoutingMode& mode); + PartitionsRoutingMode getPartitionsRoutingMode() const; + + ProducerConfiguration& setMessageRouter(const MessageRoutingPolicyPtr& router); + const MessageRoutingPolicyPtr& getMessageRouterPtr() const; + + ProducerConfiguration& setBlockIfQueueFull(bool); + bool getBlockIfQueueFull() const; + + // Zero queue size feature will not be supported on consumer end if batching is enabled + ProducerConfiguration& setBatchingEnabled(const bool& batchingEnabled); + const bool& getBatchingEnabled() const; + + ProducerConfiguration& setBatchingMaxMessages(const unsigned int& batchingMaxMessages); + const unsigned int& getBatchingMaxMessages() const; + + ProducerConfiguration& setBatchingMaxAllowedSizeInBytes( + const unsigned long& batchingMaxAllowedSizeInBytes); + const unsigned long& getBatchingMaxAllowedSizeInBytes() const; + + ProducerConfiguration& setBatchingMaxPublishDelayMs( + const unsigned long& batchingMaxPublishDelayMs); + const unsigned long& getBatchingMaxPublishDelayMs() const; + private: + struct Impl; + boost::shared_ptr impl_; +}; +} +#endif /* PULSAR_PRODUCERCONFIGURATION_H_ */ + diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc index b5a6f5185b350bffc9cc4fba7bb4833d6c688eb3..19c89666e14a5530901ef2c61056e153b2ffa6fd 100644 --- a/pulsar-client-cpp/lib/Client.cc +++ b/pulsar-client-cpp/lib/Client.cc @@ -30,142 +30,6 @@ DECLARE_LOG_OBJECT() namespace pulsar { -struct ClientConfiguration::Impl { - AuthenticationPtr authenticationPtr; - AuthDataPtr authDataPtr; - int ioThreads; - int operationTimeoutSeconds; - int messageListenerThreads; - int concurrentLookupRequest; - std::string logConfFilePath; - bool useTls; - std::string tlsTrustCertsFilePath; - bool tlsAllowInsecureConnection; - Impl() : authenticationPtr(AuthFactory::Disabled()), - authDataPtr(Auth::Disabled()), - ioThreads(1), - operationTimeoutSeconds(30), - messageListenerThreads(1), - concurrentLookupRequest(5000), - logConfFilePath(), - useTls(false), - tlsAllowInsecureConnection(true) {} -}; - -ClientConfiguration::ClientConfiguration() - : impl_(boost::make_shared()) { -} - -ClientConfiguration::~ClientConfiguration() { -} - -ClientConfiguration::ClientConfiguration(const ClientConfiguration& x) - : impl_(x.impl_) { -} - -ClientConfiguration& ClientConfiguration::operator=(const ClientConfiguration& x) { - impl_ = x.impl_; - return *this; -} - -ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authentication) { - impl_->authenticationPtr = authentication; - return *this; -} - -const Authentication& ClientConfiguration::getAuth() const { - return *impl_->authenticationPtr; -} - -ClientConfiguration& ClientConfiguration::setAuthentication(const AuthDataPtr& authentication) { - impl_->authDataPtr = authentication; - impl_->authenticationPtr = AuthData::getAuthenticationPtr(authentication); - return *this; -} - -const AuthData& ClientConfiguration::getAuthentication() const { - return *(impl_->authDataPtr); -} - - -const AuthenticationPtr& ClientConfiguration::getAuthenticationPtr() const { - return impl_->authenticationPtr; -} - -ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { - impl_->operationTimeoutSeconds = timeout; - return *this; -} - -int ClientConfiguration::getOperationTimeoutSeconds() const { - return impl_->operationTimeoutSeconds; -} - -ClientConfiguration& ClientConfiguration::setIOThreads(int threads) { - impl_->ioThreads = threads; - return *this; -} - -int ClientConfiguration::getIOThreads() const { - return impl_->ioThreads; -} - -ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads) { - impl_->messageListenerThreads = threads; - return *this; -} - -int ClientConfiguration::getMessageListenerThreads() const { - return impl_->messageListenerThreads; -} - -ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) { - impl_->useTls = useTls; - return *this; -} - -bool ClientConfiguration::isUseTls() const { - return impl_->useTls; -} - -ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::string &filePath) { - impl_->tlsTrustCertsFilePath = filePath; - return *this; -} - -std::string ClientConfiguration::getTlsTrustCertsFilePath() const { - return impl_->tlsTrustCertsFilePath; -} - -ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) { - impl_->tlsAllowInsecureConnection = allowInsecure; - return *this; -} - -bool ClientConfiguration::isTlsAllowInsecureConnection() const { - return impl_->tlsAllowInsecureConnection; -} - -ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurrentLookupRequest) { - impl_->concurrentLookupRequest = concurrentLookupRequest; - return *this; -} - -int ClientConfiguration::getConcurrentLookupRequest() const { - return impl_->concurrentLookupRequest; -} - -ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) { - impl_->logConfFilePath = logConfFilePath; - return *this; -} - -const std::string& ClientConfiguration::getLogConfFilePath() const { - return impl_->logConfFilePath; -} - -///////////////////////////////////////////////////////////////// - Client::Client(const std::string& serviceUrl) : impl_(boost::make_shared(serviceUrl, ClientConfiguration(), true)) { } diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc new file mode 100644 index 0000000000000000000000000000000000000000..05823f76a7780665b29d6a73df910a61d5a91a9b --- /dev/null +++ b/pulsar-client-cpp/lib/ClientConfiguration.cc @@ -0,0 +1,122 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace pulsar { + +ClientConfiguration::ClientConfiguration() + : impl_(boost::make_shared()) { +} + +ClientConfiguration::~ClientConfiguration() { +} + +ClientConfiguration::ClientConfiguration(const ClientConfiguration& x) + : impl_(x.impl_) { +} + +ClientConfiguration& ClientConfiguration::operator=(const ClientConfiguration& x) { + impl_ = x.impl_; + return *this; +} + +ClientConfiguration& ClientConfiguration::setAuth(const AuthenticationPtr& authentication) { + impl_->authenticationPtr = authentication; + return *this; +} + +const Authentication& ClientConfiguration::getAuth() const { + return *impl_->authenticationPtr; +} + +const AuthenticationPtr& ClientConfiguration::getAuthenticationPtr() const { + return impl_->authenticationPtr; +} + +ClientConfiguration& ClientConfiguration::setOperationTimeoutSeconds(int timeout) { + impl_->operationTimeoutSeconds = timeout; + return *this; +} + +int ClientConfiguration::getOperationTimeoutSeconds() const { + return impl_->operationTimeoutSeconds; +} + +ClientConfiguration& ClientConfiguration::setIOThreads(int threads) { + impl_->ioThreads = threads; + return *this; +} + +int ClientConfiguration::getIOThreads() const { + return impl_->ioThreads; +} + +ClientConfiguration& ClientConfiguration::setMessageListenerThreads(int threads) { + impl_->messageListenerThreads = threads; + return *this; +} + +int ClientConfiguration::getMessageListenerThreads() const { + return impl_->messageListenerThreads; +} + +ClientConfiguration& ClientConfiguration::setUseTls(bool useTls) { + impl_->useTls = useTls; + return *this; +} + +bool ClientConfiguration::isUseTls() const { + return impl_->useTls; +} + +ClientConfiguration& ClientConfiguration::setTlsTrustCertsFilePath(const std::string &filePath) { + impl_->tlsTrustCertsFilePath = filePath; + return *this; +} + +std::string ClientConfiguration::getTlsTrustCertsFilePath() const { + return impl_->tlsTrustCertsFilePath; +} + +ClientConfiguration& ClientConfiguration::setTlsAllowInsecureConnection(bool allowInsecure) { + impl_->tlsAllowInsecureConnection = allowInsecure; + return *this; +} + +bool ClientConfiguration::isTlsAllowInsecureConnection() const { + return impl_->tlsAllowInsecureConnection; +} + +ClientConfiguration& ClientConfiguration::setConcurrentLookupRequest(int concurrentLookupRequest) { + impl_->concurrentLookupRequest = concurrentLookupRequest; + return *this; +} + +int ClientConfiguration::getConcurrentLookupRequest() const { + return impl_->concurrentLookupRequest; +} + +ClientConfiguration& ClientConfiguration::setLogConfFilePath(const std::string& logConfFilePath) { + impl_->logConfFilePath = logConfFilePath; + return *this; +} + +const std::string& ClientConfiguration::getLogConfFilePath() const { + return impl_->logConfFilePath; +} + +} diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h new file mode 100644 index 0000000000000000000000000000000000000000..a0eacc579c22683d9ec15069b028bb14d2367f94 --- /dev/null +++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h @@ -0,0 +1,45 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_CLIENTCONFIGURATIONIMPL_H_ +#define LIB_CLIENTCONFIGURATIONIMPL_H_ + +#include + +namespace pulsar { + +struct ClientConfigurationImpl { + AuthenticationPtr authenticationPtr; + int ioThreads; + int operationTimeoutSeconds; + int messageListenerThreads; + int concurrentLookupRequest; + std::string logConfFilePath; + bool useTls; + std::string tlsTrustCertsFilePath; + bool tlsAllowInsecureConnection; + ClientConfigurationImpl() : authenticationPtr(AuthFactory::Disabled()), + ioThreads(1), + operationTimeoutSeconds(30), + messageListenerThreads(1), + concurrentLookupRequest(5000), + logConfFilePath(), + useTls(false), + tlsAllowInsecureConnection(true) {} +}; +} + +#endif /* LIB_CLIENTCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/Consumer.cc b/pulsar-client-cpp/lib/Consumer.cc index 927a941bfe9d1bc1e1930f0b624557a15a019bcc..561c89943a8b883c8aa16c18e930edb70930e088 100644 --- a/pulsar-client-cpp/lib/Consumer.cc +++ b/pulsar-client-cpp/lib/Consumer.cc @@ -23,100 +23,7 @@ namespace pulsar { -const std::string EMPTY_STRING; - -struct ConsumerConfiguration::Impl { - long unAckedMessagesTimeoutMs; - ConsumerType consumerType; - MessageListener messageListener; - bool hasMessageListener; - int receiverQueueSize; - std::string consumerName; - long brokerConsumerStatsCacheTimeInMs; - Impl() - : unAckedMessagesTimeoutMs(0), - consumerType(ConsumerExclusive), - messageListener(), - hasMessageListener(false), - brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds - receiverQueueSize(1000) { - } -}; - -ConsumerConfiguration::ConsumerConfiguration() - : impl_(boost::make_shared()) { -} - -ConsumerConfiguration::~ConsumerConfiguration() { -} - -ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x) - : impl_(x.impl_) { -} - -ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) { - impl_ = x.impl_; - return *this; -} - -long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const { - return impl_->brokerConsumerStatsCacheTimeInMs; -} - -void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) { - impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs; -} - -ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) { - impl_->consumerType = consumerType; - return *this; -} - -ConsumerType ConsumerConfiguration::getConsumerType() const { - return impl_->consumerType; -} - -ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) { - impl_->messageListener = messageListener; - impl_->hasMessageListener = true; - return *this; -} - -MessageListener ConsumerConfiguration::getMessageListener() const { - return impl_->messageListener; -} - -bool ConsumerConfiguration::hasMessageListener() const { - return impl_->hasMessageListener; -} - -void ConsumerConfiguration::setReceiverQueueSize(int size) { - impl_->receiverQueueSize = size; -} - -int ConsumerConfiguration::getReceiverQueueSize() const { - return impl_->receiverQueueSize; -} - -const std::string& ConsumerConfiguration::getConsumerName() const { - return impl_->consumerName; -} - -void ConsumerConfiguration::setConsumerName(const std::string& consumerName) { - impl_->consumerName = consumerName; -} - -long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const { - return impl_->unAckedMessagesTimeoutMs; -} - -void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) { - if (milliSeconds < 10000) { - throw "Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."; - } - impl_->unAckedMessagesTimeoutMs = milliSeconds; -} -////////////////////////////////////////////////////// +static const std::string EMPTY_STRING; Consumer::Consumer() : impl_() { diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc new file mode 100644 index 0000000000000000000000000000000000000000..bf2a0b3a1d216447cd5ab66013e9046bcb9ece37 --- /dev/null +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -0,0 +1,94 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +namespace pulsar { + +ConsumerConfiguration::ConsumerConfiguration() + : impl_(boost::make_shared()) { +} + +ConsumerConfiguration::~ConsumerConfiguration() { +} + +ConsumerConfiguration::ConsumerConfiguration(const ConsumerConfiguration& x) + : impl_(x.impl_) { +} + +ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfiguration& x) { + impl_ = x.impl_; + return *this; +} + +long ConsumerConfiguration::getBrokerConsumerStatsCacheTimeInMs() const { + return impl_->brokerConsumerStatsCacheTimeInMs; +} + +void ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs(const long cacheTimeInMs) { + impl_->brokerConsumerStatsCacheTimeInMs = cacheTimeInMs; +} + +ConsumerConfiguration& ConsumerConfiguration::setConsumerType(ConsumerType consumerType) { + impl_->consumerType = consumerType; + return *this; +} + +ConsumerType ConsumerConfiguration::getConsumerType() const { + return impl_->consumerType; +} + +ConsumerConfiguration& ConsumerConfiguration::setMessageListener(MessageListener messageListener) { + impl_->messageListener = messageListener; + impl_->hasMessageListener = true; + return *this; +} + +MessageListener ConsumerConfiguration::getMessageListener() const { + return impl_->messageListener; +} + +bool ConsumerConfiguration::hasMessageListener() const { + return impl_->hasMessageListener; +} + +void ConsumerConfiguration::setReceiverQueueSize(int size) { + impl_->receiverQueueSize = size; +} + +int ConsumerConfiguration::getReceiverQueueSize() const { + return impl_->receiverQueueSize; +} + +const std::string& ConsumerConfiguration::getConsumerName() const { + return impl_->consumerName; +} + +void ConsumerConfiguration::setConsumerName(const std::string& consumerName) { + impl_->consumerName = consumerName; +} + +long ConsumerConfiguration::getUnAckedMessagesTimeoutMs() const { + return impl_->unAckedMessagesTimeoutMs; +} + +void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeconds) { + if (milliSeconds < 10000) { + throw "Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."; + } + impl_->unAckedMessagesTimeoutMs = milliSeconds; +} +} diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h new file mode 100644 index 0000000000000000000000000000000000000000..f1885fbec650a0004387c4ab95af413324793178 --- /dev/null +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -0,0 +1,42 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_CONSUMERCONFIGURATIONIMPL_H_ +#define LIB_CONSUMERCONFIGURATIONIMPL_H_ + +#include +#include + +namespace pulsar { +struct ConsumerConfigurationImpl { + long unAckedMessagesTimeoutMs; + ConsumerType consumerType; + MessageListener messageListener; + bool hasMessageListener; + int receiverQueueSize; + std::string consumerName; + long brokerConsumerStatsCacheTimeInMs; + ConsumerConfigurationImpl() + : unAckedMessagesTimeoutMs(0), + consumerType(ConsumerExclusive), + messageListener(), + hasMessageListener(false), + brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds + receiverQueueSize(1000) { + } +}; +} +#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index a390c7760e13092e7596d9034011dff73c0deafa..957c12c93223ea007783816138a6d6450efe65a9 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -23,139 +23,7 @@ namespace pulsar { -const std::string EMPTY_STRING; - -struct ProducerConfiguration::Impl { - int sendTimeoutMs; - CompressionType compressionType; - int maxPendingMessages; - PartitionsRoutingMode routingMode; - MessageRoutingPolicyPtr messageRouter; - bool blockIfQueueFull; - bool batchingEnabled; - unsigned int batchingMaxMessages; - unsigned long batchingMaxAllowedSizeInBytes; - unsigned long batchingMaxPublishDelayMs; - Impl() - : sendTimeoutMs(30000), - compressionType(CompressionNone), - maxPendingMessages(1000), - routingMode(ProducerConfiguration::UseSinglePartition), - blockIfQueueFull(true), - batchingEnabled(false), - batchingMaxMessages(1000), - batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB - batchingMaxPublishDelayMs(3000) { // 3 seconds - } -}; - -ProducerConfiguration::ProducerConfiguration() - : impl_(boost::make_shared()) { -} - -ProducerConfiguration::~ProducerConfiguration() { -} - -ProducerConfiguration::ProducerConfiguration(const ProducerConfiguration& x) - : impl_(x.impl_) { -} - -ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfiguration& x) { - impl_ = x.impl_; - return *this; -} - -ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) { - impl_->sendTimeoutMs = sendTimeoutMs; - return *this; -} - -int ProducerConfiguration::getSendTimeout() const { - return impl_->sendTimeoutMs; -} - -ProducerConfiguration& ProducerConfiguration::setCompressionType(CompressionType compressionType) { - impl_->compressionType = compressionType; - return *this; -} - -CompressionType ProducerConfiguration::getCompressionType() const { - return impl_->compressionType; -} - -ProducerConfiguration& ProducerConfiguration::setMaxPendingMessages(int maxPendingMessages) { - assert(maxPendingMessages > 0); - impl_->maxPendingMessages = maxPendingMessages; - return *this; -} - -int ProducerConfiguration::getMaxPendingMessages() const { - return impl_->maxPendingMessages; -} - -ProducerConfiguration& ProducerConfiguration::setPartitionsRoutingMode(const PartitionsRoutingMode& mode) { - impl_->routingMode = mode; - return *this; -} - -ProducerConfiguration::PartitionsRoutingMode ProducerConfiguration::getPartitionsRoutingMode() const { - return impl_->routingMode; -} - -ProducerConfiguration& ProducerConfiguration::setMessageRouter(const MessageRoutingPolicyPtr& router) { - impl_->routingMode = ProducerConfiguration::CustomPartition; - impl_->messageRouter = router; - return *this; -} - -const MessageRoutingPolicyPtr& ProducerConfiguration::getMessageRouterPtr() const { - return impl_->messageRouter; -} - -ProducerConfiguration& ProducerConfiguration::setBlockIfQueueFull(bool flag) { - impl_->blockIfQueueFull = flag; - return *this; -} - -bool ProducerConfiguration::getBlockIfQueueFull() const { - return impl_->blockIfQueueFull; -} - -ProducerConfiguration& ProducerConfiguration::setBatchingEnabled(const bool& batchingEnabled) { - impl_->batchingEnabled = batchingEnabled; - return *this; -} -const bool& ProducerConfiguration::getBatchingEnabled() const { - return impl_->batchingEnabled; -} - -ProducerConfiguration& ProducerConfiguration::setBatchingMaxMessages(const unsigned int& batchingMaxMessages) { - assert(batchingMaxMessages > 1); - impl_->batchingMaxMessages = batchingMaxMessages; - return *this; -} - -const unsigned int& ProducerConfiguration::getBatchingMaxMessages() const { - return impl_->batchingMaxMessages ; -} - -ProducerConfiguration& ProducerConfiguration::setBatchingMaxAllowedSizeInBytes(const unsigned long& batchingMaxAllowedSizeInBytes) { - impl_->batchingMaxAllowedSizeInBytes = batchingMaxAllowedSizeInBytes; - return *this; -} -const unsigned long& ProducerConfiguration::getBatchingMaxAllowedSizeInBytes() const { - return impl_->batchingMaxAllowedSizeInBytes; -} - -ProducerConfiguration& ProducerConfiguration::setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs) { - impl_->batchingMaxPublishDelayMs = batchingMaxPublishDelayMs; - return *this; -} - -const unsigned long& ProducerConfiguration::getBatchingMaxPublishDelayMs() const{ - return impl_->batchingMaxPublishDelayMs; -} -//////////////////////////////////////////////////////////////////////////////// +static const std::string EMPTY_STRING; Producer::Producer() : impl_() { diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc new file mode 100644 index 0000000000000000000000000000000000000000..fa5d846ebe400b517c7f7dbd14c2249ea3a73968 --- /dev/null +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -0,0 +1,132 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + + +namespace pulsar { +ProducerConfiguration::ProducerConfiguration() + : impl_(boost::make_shared()) { +} + +ProducerConfiguration::~ProducerConfiguration() { +} + +ProducerConfiguration::ProducerConfiguration(const ProducerConfiguration& x) + : impl_(x.impl_) { +} + +ProducerConfiguration& ProducerConfiguration::operator=(const ProducerConfiguration& x) { + impl_ = x.impl_; + return *this; +} + +ProducerConfiguration& ProducerConfiguration::setSendTimeout(int sendTimeoutMs) { + impl_->sendTimeoutMs = sendTimeoutMs; + return *this; +} + +int ProducerConfiguration::getSendTimeout() const { + return impl_->sendTimeoutMs; +} + +ProducerConfiguration& ProducerConfiguration::setCompressionType(CompressionType compressionType) { + impl_->compressionType = compressionType; + return *this; +} + +CompressionType ProducerConfiguration::getCompressionType() const { + return impl_->compressionType; +} + +ProducerConfiguration& ProducerConfiguration::setMaxPendingMessages(int maxPendingMessages) { + assert(maxPendingMessages > 0); + impl_->maxPendingMessages = maxPendingMessages; + return *this; +} + +int ProducerConfiguration::getMaxPendingMessages() const { + return impl_->maxPendingMessages; +} + +ProducerConfiguration& ProducerConfiguration::setPartitionsRoutingMode( + const PartitionsRoutingMode& mode) { + impl_->routingMode = mode; + return *this; +} + +ProducerConfiguration::PartitionsRoutingMode ProducerConfiguration::getPartitionsRoutingMode() const { + return impl_->routingMode; +} + +ProducerConfiguration& ProducerConfiguration::setMessageRouter( + const MessageRoutingPolicyPtr& router) { + impl_->routingMode = ProducerConfiguration::CustomPartition; + impl_->messageRouter = router; + return *this; +} + +const MessageRoutingPolicyPtr& ProducerConfiguration::getMessageRouterPtr() const { + return impl_->messageRouter; +} + +ProducerConfiguration& ProducerConfiguration::setBlockIfQueueFull(bool flag) { + impl_->blockIfQueueFull = flag; + return *this; +} + +bool ProducerConfiguration::getBlockIfQueueFull() const { + return impl_->blockIfQueueFull; +} + +ProducerConfiguration& ProducerConfiguration::setBatchingEnabled(const bool& batchingEnabled) { + impl_->batchingEnabled = batchingEnabled; + return *this; +} +const bool& ProducerConfiguration::getBatchingEnabled() const { + return impl_->batchingEnabled; +} + +ProducerConfiguration& ProducerConfiguration::setBatchingMaxMessages( + const unsigned int& batchingMaxMessages) { + assert(batchingMaxMessages > 1); + impl_->batchingMaxMessages = batchingMaxMessages; + return *this; +} + +const unsigned int& ProducerConfiguration::getBatchingMaxMessages() const { + return impl_->batchingMaxMessages; +} + +ProducerConfiguration& ProducerConfiguration::setBatchingMaxAllowedSizeInBytes( + const unsigned long& batchingMaxAllowedSizeInBytes) { + impl_->batchingMaxAllowedSizeInBytes = batchingMaxAllowedSizeInBytes; + return *this; +} +const unsigned long& ProducerConfiguration::getBatchingMaxAllowedSizeInBytes() const { + return impl_->batchingMaxAllowedSizeInBytes; +} + +ProducerConfiguration& ProducerConfiguration::setBatchingMaxPublishDelayMs( + const unsigned long& batchingMaxPublishDelayMs) { + impl_->batchingMaxPublishDelayMs = batchingMaxPublishDelayMs; + return *this; +} + +const unsigned long& ProducerConfiguration::getBatchingMaxPublishDelayMs() const { + return impl_->batchingMaxPublishDelayMs; +} +} diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h new file mode 100644 index 0000000000000000000000000000000000000000..7c0b76dfb1dd3e65d69f6f0c671db9598453d40d --- /dev/null +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -0,0 +1,52 @@ +/** + * Copyright 2016 Yahoo Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef LIB_PRODUCERCONFIGURATIONIMPL_H_ +#define LIB_PRODUCERCONFIGURATIONIMPL_H_ + +#include +#include + +namespace pulsar { + +struct ProducerConfigurationImpl { + int sendTimeoutMs; + CompressionType compressionType; + int maxPendingMessages; + ProducerConfiguration::PartitionsRoutingMode routingMode; + MessageRoutingPolicyPtr messageRouter; + bool blockIfQueueFull; + bool batchingEnabled; + unsigned int batchingMaxMessages; + unsigned long batchingMaxAllowedSizeInBytes; + unsigned long batchingMaxPublishDelayMs; + ProducerConfigurationImpl() + : sendTimeoutMs(30000), + compressionType(CompressionNone), + maxPendingMessages(1000), + routingMode(ProducerConfiguration::UseSinglePartition), + blockIfQueueFull(true), + batchingEnabled(false), + batchingMaxMessages(1000), + batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB + batchingMaxPublishDelayMs(3000) { // 3 seconds + } +}; +} + + + +#endif /* LIB_PRODUCERCONFIGURATIONIMPL_H_ */