Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
apache
pulsar
提交
d58cc0ba
pulsar
项目概览
apache
/
pulsar
通知
129
Star
40
Fork
3
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Wiki
1
Wiki
分析
仓库
DevOps
项目成员
Pages
pulsar
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Pages
分析
分析
仓库分析
DevOps
Wiki
1
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
提交
体验新版 GitCode,发现更多精彩内容 >>
提交
d58cc0ba
编写于
4月 19, 2017
作者:
J
jai1
提交者:
Matteo Merli
4月 19, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Removing Inner structs in cpp client (#360)
上级
09920112
变更
16
显示空白变更内容
内联
并排
Showing
16 changed file
with
862 addition
and
672 deletion
+862
-672
pulsar-client-cpp/include/pulsar/Client.h
pulsar-client-cpp/include/pulsar/Client.h
+1
-143
pulsar-client-cpp/include/pulsar/ClientConfiguration.h
pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+130
-0
pulsar-client-cpp/include/pulsar/CompressionType.h
pulsar-client-cpp/include/pulsar/CompressionType.h
+27
-0
pulsar-client-cpp/include/pulsar/Consumer.h
pulsar-client-cpp/include/pulsar/Consumer.h
+1
-101
pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+128
-0
pulsar-client-cpp/include/pulsar/Producer.h
pulsar-client-cpp/include/pulsar/Producer.h
+1
-65
pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+85
-0
pulsar-client-cpp/lib/Client.cc
pulsar-client-cpp/lib/Client.cc
+0
-136
pulsar-client-cpp/lib/ClientConfiguration.cc
pulsar-client-cpp/lib/ClientConfiguration.cc
+122
-0
pulsar-client-cpp/lib/ClientConfigurationImpl.h
pulsar-client-cpp/lib/ClientConfigurationImpl.h
+45
-0
pulsar-client-cpp/lib/Consumer.cc
pulsar-client-cpp/lib/Consumer.cc
+1
-94
pulsar-client-cpp/lib/ConsumerConfiguration.cc
pulsar-client-cpp/lib/ConsumerConfiguration.cc
+94
-0
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+42
-0
pulsar-client-cpp/lib/Producer.cc
pulsar-client-cpp/lib/Producer.cc
+1
-133
pulsar-client-cpp/lib/ProducerConfiguration.cc
pulsar-client-cpp/lib/ProducerConfiguration.cc
+132
-0
pulsar-client-cpp/lib/ProducerConfigurationImpl.h
pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+52
-0
未找到文件。
pulsar-client-cpp/include/pulsar/Client.h
浏览文件 @
d58cc0ba
...
@@ -17,36 +17,14 @@
...
@@ -17,36 +17,14 @@
#ifndef PULSAR_CLIENT_HPP_
#ifndef PULSAR_CLIENT_HPP_
#define PULSAR_CLIENT_HPP_
#define PULSAR_CLIENT_HPP_
#include <pulsar/Authentication.h>
#include <pulsar/Consumer.h>
#include <pulsar/Consumer.h>
#include <pulsar/Producer.h>
#include <pulsar/Producer.h>
#include <pulsar/Result.h>
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <pulsar/Message.h>
#include <pulsar/MessageBuilder.h>
#include <pulsar/MessageBuilder.h>
#include <pulsar/ClientConfiguration.h>
#include <string>
#include <string>
#ifdef PULSAR_ENABLE_DEPRECATED_METHOD
#include<pulsar/Auth.h>
#else
// Deprecated
namespace
pulsar
{
class
AuthData
;
typedef
boost
::
shared_ptr
<
AuthData
>
AuthDataPtr
;
class
AuthData
{
public:
static
AuthenticationPtr
getAuthenticationPtr
(
const
AuthDataPtr
&
authentication
)
{
AuthenticationPtr
ptr
;
return
ptr
;
}
};
class
Auth
{
public:
static
AuthDataPtr
Disabled
()
{
return
AuthDataPtr
();
}
};
}
#endif
#pragma GCC visibility push(default)
#pragma GCC visibility push(default)
class
PulsarFriend
;
class
PulsarFriend
;
...
@@ -57,126 +35,6 @@ typedef boost::function<void(Result, Producer)> CreateProducerCallback;
...
@@ -57,126 +35,6 @@ typedef boost::function<void(Result, Producer)> CreateProducerCallback;
typedef
boost
::
function
<
void
(
Result
,
Consumer
)
>
SubscribeCallback
;
typedef
boost
::
function
<
void
(
Result
,
Consumer
)
>
SubscribeCallback
;
typedef
boost
::
function
<
void
(
Result
)
>
CloseCallback
;
typedef
boost
::
function
<
void
(
Result
)
>
CloseCallback
;
class
ClientConfiguration
{
public:
ClientConfiguration
();
~
ClientConfiguration
();
ClientConfiguration
(
const
ClientConfiguration
&
);
ClientConfiguration
&
operator
=
(
const
ClientConfiguration
&
);
/**
* @deprecated
* Set the authentication method to be used with the broker
*
* @param authentication the authentication data to use
*/
ClientConfiguration
&
setAuthentication
(
const
AuthDataPtr
&
authentication
);
/**
* @deprecated
* @return the authentication data
*/
const
AuthData
&
getAuthentication
()
const
;
/**
* Set the authentication method to be used with the broker
*
* @param authentication the authentication data to use
*/
ClientConfiguration
&
setAuth
(
const
AuthenticationPtr
&
authentication
);
/**
* @return the authentication data
*/
const
Authentication
&
getAuth
()
const
;
/**
* Set timeout on client operations (subscribe, create producer, close, unsubscribe)
* Default is 30 seconds.
*
* @param timeout the timeout after which the operation will be considered as failed
*/
ClientConfiguration
&
setOperationTimeoutSeconds
(
int
timeout
);
/**
* @return the client operations timeout in seconds
*/
int
getOperationTimeoutSeconds
()
const
;
/**
* Set the number of IO threads to be used by the Pulsar client. Default is 1
* thread.
*
* @param threads number of threads
*/
ClientConfiguration
&
setIOThreads
(
int
threads
);
/**
* @return the number of IO threads to use
*/
int
getIOThreads
()
const
;
/**
* Set the number of threads to be used by the Pulsar client when delivering messages
* through message listener. Default is 1 thread per Pulsar client.
*
* If using more than 1 thread, messages for distinct MessageListener will be
* delivered in different threads, however a single MessageListener will always
* be assigned to the same thread.
*
* @param threads number of threads
*/
ClientConfiguration
&
setMessageListenerThreads
(
int
threads
);
/**
* @return the number of IO threads to use
*/
int
getMessageListenerThreads
()
const
;
/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
ClientConfiguration
&
setConcurrentLookupRequest
(
int
concurrentLookupRequest
);
/**
* @return Get configured total allowed concurrent lookup-request.
*/
int
getConcurrentLookupRequest
()
const
;
/**
* Initialize the log configuration
*
* @param logConfFilePath path of the configuration file
*/
ClientConfiguration
&
setLogConfFilePath
(
const
std
::
string
&
logConfFilePath
);
/**
* Get the path of log configuration file (log4cpp)
*/
const
std
::
string
&
getLogConfFilePath
()
const
;
ClientConfiguration
&
setUseTls
(
bool
useTls
);
bool
isUseTls
()
const
;
ClientConfiguration
&
setTlsTrustCertsFilePath
(
const
std
::
string
&
tlsTrustCertsFilePath
);
std
::
string
getTlsTrustCertsFilePath
()
const
;
ClientConfiguration
&
setTlsAllowInsecureConnection
(
bool
allowInsecure
);
bool
isTlsAllowInsecureConnection
()
const
;
private:
const
AuthenticationPtr
&
getAuthenticationPtr
()
const
;
struct
Impl
;
boost
::
shared_ptr
<
Impl
>
impl_
;
friend
class
ClientImpl
;
};
class
ClientImpl
;
class
ClientImpl
;
class
Client
{
class
Client
{
...
...
pulsar-client-cpp/include/pulsar/ClientConfiguration.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CLIENTCONFIGURATION_H_
#define PULSAR_CLIENTCONFIGURATION_H_
#include <pulsar/Authentication.h>
namespace
pulsar
{
class
ClientConfigurationImpl
;
class
ClientConfiguration
{
public:
ClientConfiguration
();
~
ClientConfiguration
();
ClientConfiguration
(
const
ClientConfiguration
&
);
ClientConfiguration
&
operator
=
(
const
ClientConfiguration
&
);
/**
* Set the authentication method to be used with the broker
*
* @param authentication the authentication data to use
*/
ClientConfiguration
&
setAuth
(
const
AuthenticationPtr
&
authentication
);
/**
* @return the authentication data
*/
const
Authentication
&
getAuth
()
const
;
/**
* Set timeout on client operations (subscribe, create producer, close, unsubscribe)
* Default is 30 seconds.
*
* @param timeout the timeout after which the operation will be considered as failed
*/
ClientConfiguration
&
setOperationTimeoutSeconds
(
int
timeout
);
/**
* @return the client operations timeout in seconds
*/
int
getOperationTimeoutSeconds
()
const
;
/**
* Set the number of IO threads to be used by the Pulsar client. Default is 1
* thread.
*
* @param threads number of threads
*/
ClientConfiguration
&
setIOThreads
(
int
threads
);
/**
* @return the number of IO threads to use
*/
int
getIOThreads
()
const
;
/**
* Set the number of threads to be used by the Pulsar client when delivering messages
* through message listener. Default is 1 thread per Pulsar client.
*
* If using more than 1 thread, messages for distinct MessageListener will be
* delivered in different threads, however a single MessageListener will always
* be assigned to the same thread.
*
* @param threads number of threads
*/
ClientConfiguration
&
setMessageListenerThreads
(
int
threads
);
/**
* @return the number of IO threads to use
*/
int
getMessageListenerThreads
()
const
;
/**
* Number of concurrent lookup-requests allowed on each broker-connection to prevent overload on broker.
* <i>(default: 5000)</i> It should be configured with higher value only in case of it requires to produce/subscribe on
* thousands of topic using created {@link PulsarClient}
*
* @param concurrentLookupRequest
*/
ClientConfiguration
&
setConcurrentLookupRequest
(
int
concurrentLookupRequest
);
/**
* @return Get configured total allowed concurrent lookup-request.
*/
int
getConcurrentLookupRequest
()
const
;
/**
* Initialize the log configuration
*
* @param logConfFilePath path of the configuration file
*/
ClientConfiguration
&
setLogConfFilePath
(
const
std
::
string
&
logConfFilePath
);
/**
* Get the path of log configuration file (log4cpp)
*/
const
std
::
string
&
getLogConfFilePath
()
const
;
ClientConfiguration
&
setUseTls
(
bool
useTls
);
bool
isUseTls
()
const
;
ClientConfiguration
&
setTlsTrustCertsFilePath
(
const
std
::
string
&
tlsTrustCertsFilePath
);
std
::
string
getTlsTrustCertsFilePath
()
const
;
ClientConfiguration
&
setTlsAllowInsecureConnection
(
bool
allowInsecure
);
bool
isTlsAllowInsecureConnection
()
const
;
private:
const
AuthenticationPtr
&
getAuthenticationPtr
()
const
;
boost
::
shared_ptr
<
ClientConfigurationImpl
>
impl_
;
friend
class
ClientImpl
;
};
}
#endif
/* PULSAR_CLIENTCONFIGURATION_H_ */
pulsar-client-cpp/include/pulsar/CompressionType.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_COMPRESSIONTYPE_H_
#define PULSAR_COMPRESSIONTYPE_H_
namespace
pulsar
{
enum
CompressionType
{
CompressionNone
=
0
,
CompressionLZ4
=
1
,
CompressionZLib
=
2
};
}
#endif
/* PULSAR_COMPRESSIONTYPE_H_ */
pulsar-client-cpp/include/pulsar/Consumer.h
浏览文件 @
d58cc0ba
...
@@ -17,116 +17,16 @@
...
@@ -17,116 +17,16 @@
#ifndef CONSUMER_HPP_
#ifndef CONSUMER_HPP_
#define CONSUMER_HPP_
#define CONSUMER_HPP_
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <pulsar/Message.h>
#include <pulsar/Result.h>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <iostream>
#include <iostream>
#include <pulsar/ConsumerType.h>
#include <pulsar/BrokerConsumerStats.h>
#include <pulsar/BrokerConsumerStats.h>
#include <pulsar/ConsumerConfiguration.h>
#pragma GCC visibility push(default)
#pragma GCC visibility push(default)
class
PulsarFriend
;
class
PulsarFriend
;
namespace
pulsar
{
namespace
pulsar
{
class
Consumer
;
/// Callback definition for non-data operation
typedef
boost
::
function
<
void
(
Result
result
)
>
ResultCallback
;
/// Callback definition for MessageListener
typedef
boost
::
function
<
void
(
Consumer
consumer
,
const
Message
&
msg
)
>
MessageListener
;
/**
* Class specifying the configuration of a consumer.
*/
class
ConsumerConfiguration
{
public:
ConsumerConfiguration
();
~
ConsumerConfiguration
();
ConsumerConfiguration
(
const
ConsumerConfiguration
&
);
ConsumerConfiguration
&
operator
=
(
const
ConsumerConfiguration
&
);
/**
* Specify the consumer type. The consumer type enables
* specifying the type of subscription. In Exclusive subscription,
* only a single consumer is allowed to attach to the subscription. Other consumers
* will get an error message. In Shared subscription, multiple consumers will be
* able to use the same subscription name and the messages will be dispatched in a
* round robin fashion. In Failover subscription, a master-slave subscription model
* allows for multiple consumers to attach to a single subscription, though only one
* of them will be “master” at a given time. Only the master consumer will receive
* messages. When the master gets disconnected, one among the slaves will be promoted
* to master and will start getting messages.
*/
ConsumerConfiguration
&
setConsumerType
(
ConsumerType
consumerType
);
ConsumerType
getConsumerType
()
const
;
/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
* for every message received.
*/
ConsumerConfiguration
&
setMessageListener
(
MessageListener
messageListener
);
MessageListener
getMessageListener
()
const
;
bool
hasMessageListener
()
const
;
/**
* Sets the size of the consumer receive queue.
*
* The consumer receive queue controls how many messages can be accumulated by the Consumer before the
* application calls receive(). Using a higher value could potentially increase the consumer throughput
* at the expense of bigger memory utilization.
*
* Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of
* messages. This approach improves the message distribution on shared subscription, by pushing messages only to
* the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be
* used if the consumer queue size is zero. The receive() function call should not be interrupted when
* the consumer queue size is zero.
*
* Default value is 1000 messages and should be good for most use cases.
*
* @param size
* the new receiver queue size value
*/
void
setReceiverQueueSize
(
int
size
);
int
getReceiverQueueSize
()
const
;
void
setConsumerName
(
const
std
::
string
&
);
const
std
::
string
&
getConsumerName
()
const
;
/**
* Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
* 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
* If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
* redelivered.
* @param timeout in milliseconds
*/
void
setUnAckedMessagesTimeoutMs
(
const
uint64_t
milliSeconds
);
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
long
getUnAckedMessagesTimeoutMs
()
const
;
/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
*/
void
setBrokerConsumerStatsCacheTimeInMs
(
const
long
cacheTimeInMs
);
/**
* @return the configured timeout in milliseconds caching BrokerConsumerStats.
*/
long
getBrokerConsumerStatsCacheTimeInMs
()
const
;
private:
struct
Impl
;
boost
::
shared_ptr
<
Impl
>
impl_
;
};
class
ConsumerImplBase
;
class
ConsumerImplBase
;
/**
/**
...
...
pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_CONSUMERCONFIGURATION_H_
#define PULSAR_CONSUMERCONFIGURATION_H_
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <pulsar/Result.h>
#include <pulsar/ConsumerType.h>
#include <pulsar/Message.h>
namespace
pulsar
{
class
Consumer
;
/// Callback definition for non-data operation
typedef
boost
::
function
<
void
(
Result
result
)
>
ResultCallback
;
/// Callback definition for MessageListener
typedef
boost
::
function
<
void
(
Consumer
consumer
,
const
Message
&
msg
)
>
MessageListener
;
class
ConsumerConfigurationImpl
;
/**
* Class specifying the configuration of a consumer.
*/
class
ConsumerConfiguration
{
public:
ConsumerConfiguration
();
~
ConsumerConfiguration
();
ConsumerConfiguration
(
const
ConsumerConfiguration
&
);
ConsumerConfiguration
&
operator
=
(
const
ConsumerConfiguration
&
);
/**
* Specify the consumer type. The consumer type enables
* specifying the type of subscription. In Exclusive subscription,
* only a single consumer is allowed to attach to the subscription. Other consumers
* will get an error message. In Shared subscription, multiple consumers will be
* able to use the same subscription name and the messages will be dispatched in a
* round robin fashion. In Failover subscription, a master-slave subscription model
* allows for multiple consumers to attach to a single subscription, though only one
* of them will be “master” at a given time. Only the master consumer will receive
* messages. When the master gets disconnected, one among the slaves will be promoted
* to master and will start getting messages.
*/
ConsumerConfiguration
&
setConsumerType
(
ConsumerType
consumerType
);
ConsumerType
getConsumerType
()
const
;
/**
* A message listener enables your application to configure how to process
* and acknowledge messages delivered. A listener will be called in order
* for every message received.
*/
ConsumerConfiguration
&
setMessageListener
(
MessageListener
messageListener
);
MessageListener
getMessageListener
()
const
;
bool
hasMessageListener
()
const
;
/**
* Sets the size of the consumer receive queue.
*
* The consumer receive queue controls how many messages can be accumulated by the Consumer before the
* application calls receive(). Using a higher value could potentially increase the consumer throughput
* at the expense of bigger memory utilization.
*
* Setting the consumer queue size as zero decreases the throughput of the consumer, by disabling pre-fetching of
* messages. This approach improves the message distribution on shared subscription, by pushing messages only to
* the consumers that are ready to process them. Neither receive with timeout nor Partitioned Topics can be
* used if the consumer queue size is zero. The receive() function call should not be interrupted when
* the consumer queue size is zero.
*
* Default value is 1000 messages and should be good for most use cases.
*
* @param size
* the new receiver queue size value
*/
void
setReceiverQueueSize
(
int
size
);
int
getReceiverQueueSize
()
const
;
void
setConsumerName
(
const
std
::
string
&
);
const
std
::
string
&
getConsumerName
()
const
;
/**
* Set the timeout in milliseconds for unacknowledged messages, the timeout needs to be greater than
* 10 seconds. An Exception is thrown if the given value is less than 10000 (10 seconds).
* If a successful acknowledgement is not sent within the timeout all the unacknowledged messages are
* redelivered.
* @param timeout in milliseconds
*/
void
setUnAckedMessagesTimeoutMs
(
const
uint64_t
milliSeconds
);
/**
* @return the configured timeout in milliseconds for unacked messages.
*/
long
getUnAckedMessagesTimeoutMs
()
const
;
/**
* Set the time duration for which the broker side consumer stats will be cached in the client.
* @param cacheTimeInMs in milliseconds
*/
void
setBrokerConsumerStatsCacheTimeInMs
(
const
long
cacheTimeInMs
);
/**
* @return the configured timeout in milliseconds caching BrokerConsumerStats.
*/
long
getBrokerConsumerStatsCacheTimeInMs
()
const
;
private:
boost
::
shared_ptr
<
ConsumerConfigurationImpl
>
impl_
;
};
}
#endif
/* PULSAR_CONSUMERCONFIGURATION_H_ */
pulsar-client-cpp/include/pulsar/Producer.h
浏览文件 @
d58cc0ba
...
@@ -17,11 +17,7 @@
...
@@ -17,11 +17,7 @@
#ifndef PRODUCER_HPP_
#ifndef PRODUCER_HPP_
#define PRODUCER_HPP_
#define PRODUCER_HPP_
#include <pulsar/Result.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/Message.h>
#include <pulsar/MessageRoutingPolicy.h>
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/shared_ptr.hpp>
#include <stdint.h>
#include <stdint.h>
...
@@ -30,66 +26,6 @@
...
@@ -30,66 +26,6 @@
class
PulsarFriend
;
class
PulsarFriend
;
namespace
pulsar
{
namespace
pulsar
{
typedef
boost
::
function
<
void
(
Result
,
const
Message
&
msg
)
>
SendCallback
;
typedef
boost
::
function
<
void
(
Result
)
>
CloseCallback
;
enum
CompressionType
{
CompressionNone
=
0
,
CompressionLZ4
=
1
,
CompressionZLib
=
2
};
/**
* Class that holds the configuration for a producer
*/
class
ProducerConfiguration
{
public:
enum
PartitionsRoutingMode
{
UseSinglePartition
,
RoundRobinDistribution
,
CustomPartition
};
ProducerConfiguration
();
~
ProducerConfiguration
();
ProducerConfiguration
(
const
ProducerConfiguration
&
);
ProducerConfiguration
&
operator
=
(
const
ProducerConfiguration
&
);
ProducerConfiguration
&
setSendTimeout
(
int
sendTimeoutMs
);
int
getSendTimeout
()
const
;
ProducerConfiguration
&
setCompressionType
(
CompressionType
compressionType
);
CompressionType
getCompressionType
()
const
;
ProducerConfiguration
&
setMaxPendingMessages
(
int
maxPendingMessages
);
int
getMaxPendingMessages
()
const
;
ProducerConfiguration
&
setPartitionsRoutingMode
(
const
PartitionsRoutingMode
&
mode
);
PartitionsRoutingMode
getPartitionsRoutingMode
()
const
;
ProducerConfiguration
&
setMessageRouter
(
const
MessageRoutingPolicyPtr
&
router
);
const
MessageRoutingPolicyPtr
&
getMessageRouterPtr
()
const
;
ProducerConfiguration
&
setBlockIfQueueFull
(
bool
);
bool
getBlockIfQueueFull
()
const
;
// Zero queue size feature will not be supported on consumer end if batching is enabled
ProducerConfiguration
&
setBatchingEnabled
(
const
bool
&
batchingEnabled
);
const
bool
&
getBatchingEnabled
()
const
;
ProducerConfiguration
&
setBatchingMaxMessages
(
const
unsigned
int
&
batchingMaxMessages
);
const
unsigned
int
&
getBatchingMaxMessages
()
const
;
ProducerConfiguration
&
setBatchingMaxAllowedSizeInBytes
(
const
unsigned
long
&
batchingMaxAllowedSizeInBytes
);
const
unsigned
long
&
getBatchingMaxAllowedSizeInBytes
()
const
;
ProducerConfiguration
&
setBatchingMaxPublishDelayMs
(
const
unsigned
long
&
batchingMaxPublishDelayMs
);
const
unsigned
long
&
getBatchingMaxPublishDelayMs
()
const
;
private:
struct
Impl
;
boost
::
shared_ptr
<
Impl
>
impl_
;
};
class
ProducerImplBase
;
class
ProducerImplBase
;
class
Producer
{
class
Producer
{
...
...
pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef PULSAR_PRODUCERCONFIGURATION_H_
#define PULSAR_PRODUCERCONFIGURATION_H_
#include <pulsar/CompressionType.h>
#include <pulsar/MessageRoutingPolicy.h>
#include <pulsar/Result.h>
#include <pulsar/Message.h>
#include <boost/function.hpp>
namespace
pulsar
{
typedef
boost
::
function
<
void
(
Result
,
const
Message
&
msg
)
>
SendCallback
;
typedef
boost
::
function
<
void
(
Result
)
>
CloseCallback
;
class
ProducerConfigurationImpl
;
/**
* Class that holds the configuration for a producer
*/
class
ProducerConfiguration
{
public:
enum
PartitionsRoutingMode
{
UseSinglePartition
,
RoundRobinDistribution
,
CustomPartition
};
ProducerConfiguration
();
~
ProducerConfiguration
();
ProducerConfiguration
(
const
ProducerConfiguration
&
);
ProducerConfiguration
&
operator
=
(
const
ProducerConfiguration
&
);
ProducerConfiguration
&
setSendTimeout
(
int
sendTimeoutMs
);
int
getSendTimeout
()
const
;
ProducerConfiguration
&
setCompressionType
(
CompressionType
compressionType
);
CompressionType
getCompressionType
()
const
;
ProducerConfiguration
&
setMaxPendingMessages
(
int
maxPendingMessages
);
int
getMaxPendingMessages
()
const
;
ProducerConfiguration
&
setPartitionsRoutingMode
(
const
PartitionsRoutingMode
&
mode
);
PartitionsRoutingMode
getPartitionsRoutingMode
()
const
;
ProducerConfiguration
&
setMessageRouter
(
const
MessageRoutingPolicyPtr
&
router
);
const
MessageRoutingPolicyPtr
&
getMessageRouterPtr
()
const
;
ProducerConfiguration
&
setBlockIfQueueFull
(
bool
);
bool
getBlockIfQueueFull
()
const
;
// Zero queue size feature will not be supported on consumer end if batching is enabled
ProducerConfiguration
&
setBatchingEnabled
(
const
bool
&
batchingEnabled
);
const
bool
&
getBatchingEnabled
()
const
;
ProducerConfiguration
&
setBatchingMaxMessages
(
const
unsigned
int
&
batchingMaxMessages
);
const
unsigned
int
&
getBatchingMaxMessages
()
const
;
ProducerConfiguration
&
setBatchingMaxAllowedSizeInBytes
(
const
unsigned
long
&
batchingMaxAllowedSizeInBytes
);
const
unsigned
long
&
getBatchingMaxAllowedSizeInBytes
()
const
;
ProducerConfiguration
&
setBatchingMaxPublishDelayMs
(
const
unsigned
long
&
batchingMaxPublishDelayMs
);
const
unsigned
long
&
getBatchingMaxPublishDelayMs
()
const
;
private:
struct
Impl
;
boost
::
shared_ptr
<
ProducerConfigurationImpl
>
impl_
;
};
}
#endif
/* PULSAR_PRODUCERCONFIGURATION_H_ */
pulsar-client-cpp/lib/Client.cc
浏览文件 @
d58cc0ba
...
@@ -30,142 +30,6 @@ DECLARE_LOG_OBJECT()
...
@@ -30,142 +30,6 @@ DECLARE_LOG_OBJECT()
namespace
pulsar
{
namespace
pulsar
{
struct
ClientConfiguration
::
Impl
{
AuthenticationPtr
authenticationPtr
;
AuthDataPtr
authDataPtr
;
int
ioThreads
;
int
operationTimeoutSeconds
;
int
messageListenerThreads
;
int
concurrentLookupRequest
;
std
::
string
logConfFilePath
;
bool
useTls
;
std
::
string
tlsTrustCertsFilePath
;
bool
tlsAllowInsecureConnection
;
Impl
()
:
authenticationPtr
(
AuthFactory
::
Disabled
()),
authDataPtr
(
Auth
::
Disabled
()),
ioThreads
(
1
),
operationTimeoutSeconds
(
30
),
messageListenerThreads
(
1
),
concurrentLookupRequest
(
5000
),
logConfFilePath
(),
useTls
(
false
),
tlsAllowInsecureConnection
(
true
)
{}
};
ClientConfiguration
::
ClientConfiguration
()
:
impl_
(
boost
::
make_shared
<
Impl
>
())
{
}
ClientConfiguration
::~
ClientConfiguration
()
{
}
ClientConfiguration
::
ClientConfiguration
(
const
ClientConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ClientConfiguration
&
ClientConfiguration
::
operator
=
(
const
ClientConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
ClientConfiguration
&
ClientConfiguration
::
setAuth
(
const
AuthenticationPtr
&
authentication
)
{
impl_
->
authenticationPtr
=
authentication
;
return
*
this
;
}
const
Authentication
&
ClientConfiguration
::
getAuth
()
const
{
return
*
impl_
->
authenticationPtr
;
}
ClientConfiguration
&
ClientConfiguration
::
setAuthentication
(
const
AuthDataPtr
&
authentication
)
{
impl_
->
authDataPtr
=
authentication
;
impl_
->
authenticationPtr
=
AuthData
::
getAuthenticationPtr
(
authentication
);
return
*
this
;
}
const
AuthData
&
ClientConfiguration
::
getAuthentication
()
const
{
return
*
(
impl_
->
authDataPtr
);
}
const
AuthenticationPtr
&
ClientConfiguration
::
getAuthenticationPtr
()
const
{
return
impl_
->
authenticationPtr
;
}
ClientConfiguration
&
ClientConfiguration
::
setOperationTimeoutSeconds
(
int
timeout
)
{
impl_
->
operationTimeoutSeconds
=
timeout
;
return
*
this
;
}
int
ClientConfiguration
::
getOperationTimeoutSeconds
()
const
{
return
impl_
->
operationTimeoutSeconds
;
}
ClientConfiguration
&
ClientConfiguration
::
setIOThreads
(
int
threads
)
{
impl_
->
ioThreads
=
threads
;
return
*
this
;
}
int
ClientConfiguration
::
getIOThreads
()
const
{
return
impl_
->
ioThreads
;
}
ClientConfiguration
&
ClientConfiguration
::
setMessageListenerThreads
(
int
threads
)
{
impl_
->
messageListenerThreads
=
threads
;
return
*
this
;
}
int
ClientConfiguration
::
getMessageListenerThreads
()
const
{
return
impl_
->
messageListenerThreads
;
}
ClientConfiguration
&
ClientConfiguration
::
setUseTls
(
bool
useTls
)
{
impl_
->
useTls
=
useTls
;
return
*
this
;
}
bool
ClientConfiguration
::
isUseTls
()
const
{
return
impl_
->
useTls
;
}
ClientConfiguration
&
ClientConfiguration
::
setTlsTrustCertsFilePath
(
const
std
::
string
&
filePath
)
{
impl_
->
tlsTrustCertsFilePath
=
filePath
;
return
*
this
;
}
std
::
string
ClientConfiguration
::
getTlsTrustCertsFilePath
()
const
{
return
impl_
->
tlsTrustCertsFilePath
;
}
ClientConfiguration
&
ClientConfiguration
::
setTlsAllowInsecureConnection
(
bool
allowInsecure
)
{
impl_
->
tlsAllowInsecureConnection
=
allowInsecure
;
return
*
this
;
}
bool
ClientConfiguration
::
isTlsAllowInsecureConnection
()
const
{
return
impl_
->
tlsAllowInsecureConnection
;
}
ClientConfiguration
&
ClientConfiguration
::
setConcurrentLookupRequest
(
int
concurrentLookupRequest
)
{
impl_
->
concurrentLookupRequest
=
concurrentLookupRequest
;
return
*
this
;
}
int
ClientConfiguration
::
getConcurrentLookupRequest
()
const
{
return
impl_
->
concurrentLookupRequest
;
}
ClientConfiguration
&
ClientConfiguration
::
setLogConfFilePath
(
const
std
::
string
&
logConfFilePath
)
{
impl_
->
logConfFilePath
=
logConfFilePath
;
return
*
this
;
}
const
std
::
string
&
ClientConfiguration
::
getLogConfFilePath
()
const
{
return
impl_
->
logConfFilePath
;
}
/////////////////////////////////////////////////////////////////
Client
::
Client
(
const
std
::
string
&
serviceUrl
)
Client
::
Client
(
const
std
::
string
&
serviceUrl
)
:
impl_
(
boost
::
make_shared
<
ClientImpl
>
(
serviceUrl
,
ClientConfiguration
(),
true
))
{
:
impl_
(
boost
::
make_shared
<
ClientImpl
>
(
serviceUrl
,
ClientConfiguration
(),
true
))
{
}
}
...
...
pulsar-client-cpp/lib/ClientConfiguration.cc
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/ClientConfigurationImpl.h>
namespace
pulsar
{
ClientConfiguration
::
ClientConfiguration
()
:
impl_
(
boost
::
make_shared
<
ClientConfigurationImpl
>
())
{
}
ClientConfiguration
::~
ClientConfiguration
()
{
}
ClientConfiguration
::
ClientConfiguration
(
const
ClientConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ClientConfiguration
&
ClientConfiguration
::
operator
=
(
const
ClientConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
ClientConfiguration
&
ClientConfiguration
::
setAuth
(
const
AuthenticationPtr
&
authentication
)
{
impl_
->
authenticationPtr
=
authentication
;
return
*
this
;
}
const
Authentication
&
ClientConfiguration
::
getAuth
()
const
{
return
*
impl_
->
authenticationPtr
;
}
const
AuthenticationPtr
&
ClientConfiguration
::
getAuthenticationPtr
()
const
{
return
impl_
->
authenticationPtr
;
}
ClientConfiguration
&
ClientConfiguration
::
setOperationTimeoutSeconds
(
int
timeout
)
{
impl_
->
operationTimeoutSeconds
=
timeout
;
return
*
this
;
}
int
ClientConfiguration
::
getOperationTimeoutSeconds
()
const
{
return
impl_
->
operationTimeoutSeconds
;
}
ClientConfiguration
&
ClientConfiguration
::
setIOThreads
(
int
threads
)
{
impl_
->
ioThreads
=
threads
;
return
*
this
;
}
int
ClientConfiguration
::
getIOThreads
()
const
{
return
impl_
->
ioThreads
;
}
ClientConfiguration
&
ClientConfiguration
::
setMessageListenerThreads
(
int
threads
)
{
impl_
->
messageListenerThreads
=
threads
;
return
*
this
;
}
int
ClientConfiguration
::
getMessageListenerThreads
()
const
{
return
impl_
->
messageListenerThreads
;
}
ClientConfiguration
&
ClientConfiguration
::
setUseTls
(
bool
useTls
)
{
impl_
->
useTls
=
useTls
;
return
*
this
;
}
bool
ClientConfiguration
::
isUseTls
()
const
{
return
impl_
->
useTls
;
}
ClientConfiguration
&
ClientConfiguration
::
setTlsTrustCertsFilePath
(
const
std
::
string
&
filePath
)
{
impl_
->
tlsTrustCertsFilePath
=
filePath
;
return
*
this
;
}
std
::
string
ClientConfiguration
::
getTlsTrustCertsFilePath
()
const
{
return
impl_
->
tlsTrustCertsFilePath
;
}
ClientConfiguration
&
ClientConfiguration
::
setTlsAllowInsecureConnection
(
bool
allowInsecure
)
{
impl_
->
tlsAllowInsecureConnection
=
allowInsecure
;
return
*
this
;
}
bool
ClientConfiguration
::
isTlsAllowInsecureConnection
()
const
{
return
impl_
->
tlsAllowInsecureConnection
;
}
ClientConfiguration
&
ClientConfiguration
::
setConcurrentLookupRequest
(
int
concurrentLookupRequest
)
{
impl_
->
concurrentLookupRequest
=
concurrentLookupRequest
;
return
*
this
;
}
int
ClientConfiguration
::
getConcurrentLookupRequest
()
const
{
return
impl_
->
concurrentLookupRequest
;
}
ClientConfiguration
&
ClientConfiguration
::
setLogConfFilePath
(
const
std
::
string
&
logConfFilePath
)
{
impl_
->
logConfFilePath
=
logConfFilePath
;
return
*
this
;
}
const
std
::
string
&
ClientConfiguration
::
getLogConfFilePath
()
const
{
return
impl_
->
logConfFilePath
;
}
}
pulsar-client-cpp/lib/ClientConfigurationImpl.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_CLIENTCONFIGURATIONIMPL_H_
#define LIB_CLIENTCONFIGURATIONIMPL_H_
#include <pulsar/ClientConfiguration.h>
namespace
pulsar
{
struct
ClientConfigurationImpl
{
AuthenticationPtr
authenticationPtr
;
int
ioThreads
;
int
operationTimeoutSeconds
;
int
messageListenerThreads
;
int
concurrentLookupRequest
;
std
::
string
logConfFilePath
;
bool
useTls
;
std
::
string
tlsTrustCertsFilePath
;
bool
tlsAllowInsecureConnection
;
ClientConfigurationImpl
()
:
authenticationPtr
(
AuthFactory
::
Disabled
()),
ioThreads
(
1
),
operationTimeoutSeconds
(
30
),
messageListenerThreads
(
1
),
concurrentLookupRequest
(
5000
),
logConfFilePath
(),
useTls
(
false
),
tlsAllowInsecureConnection
(
true
)
{}
};
}
#endif
/* LIB_CLIENTCONFIGURATIONIMPL_H_ */
pulsar-client-cpp/lib/Consumer.cc
浏览文件 @
d58cc0ba
...
@@ -23,100 +23,7 @@
...
@@ -23,100 +23,7 @@
namespace
pulsar
{
namespace
pulsar
{
const
std
::
string
EMPTY_STRING
;
static
const
std
::
string
EMPTY_STRING
;
struct
ConsumerConfiguration
::
Impl
{
long
unAckedMessagesTimeoutMs
;
ConsumerType
consumerType
;
MessageListener
messageListener
;
bool
hasMessageListener
;
int
receiverQueueSize
;
std
::
string
consumerName
;
long
brokerConsumerStatsCacheTimeInMs
;
Impl
()
:
unAckedMessagesTimeoutMs
(
0
),
consumerType
(
ConsumerExclusive
),
messageListener
(),
hasMessageListener
(
false
),
brokerConsumerStatsCacheTimeInMs
(
30
*
1000
),
// 30 seconds
receiverQueueSize
(
1000
)
{
}
};
ConsumerConfiguration
::
ConsumerConfiguration
()
:
impl_
(
boost
::
make_shared
<
Impl
>
())
{
}
ConsumerConfiguration
::~
ConsumerConfiguration
()
{
}
ConsumerConfiguration
::
ConsumerConfiguration
(
const
ConsumerConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ConsumerConfiguration
&
ConsumerConfiguration
::
operator
=
(
const
ConsumerConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
long
ConsumerConfiguration
::
getBrokerConsumerStatsCacheTimeInMs
()
const
{
return
impl_
->
brokerConsumerStatsCacheTimeInMs
;
}
void
ConsumerConfiguration
::
setBrokerConsumerStatsCacheTimeInMs
(
const
long
cacheTimeInMs
)
{
impl_
->
brokerConsumerStatsCacheTimeInMs
=
cacheTimeInMs
;
}
ConsumerConfiguration
&
ConsumerConfiguration
::
setConsumerType
(
ConsumerType
consumerType
)
{
impl_
->
consumerType
=
consumerType
;
return
*
this
;
}
ConsumerType
ConsumerConfiguration
::
getConsumerType
()
const
{
return
impl_
->
consumerType
;
}
ConsumerConfiguration
&
ConsumerConfiguration
::
setMessageListener
(
MessageListener
messageListener
)
{
impl_
->
messageListener
=
messageListener
;
impl_
->
hasMessageListener
=
true
;
return
*
this
;
}
MessageListener
ConsumerConfiguration
::
getMessageListener
()
const
{
return
impl_
->
messageListener
;
}
bool
ConsumerConfiguration
::
hasMessageListener
()
const
{
return
impl_
->
hasMessageListener
;
}
void
ConsumerConfiguration
::
setReceiverQueueSize
(
int
size
)
{
impl_
->
receiverQueueSize
=
size
;
}
int
ConsumerConfiguration
::
getReceiverQueueSize
()
const
{
return
impl_
->
receiverQueueSize
;
}
const
std
::
string
&
ConsumerConfiguration
::
getConsumerName
()
const
{
return
impl_
->
consumerName
;
}
void
ConsumerConfiguration
::
setConsumerName
(
const
std
::
string
&
consumerName
)
{
impl_
->
consumerName
=
consumerName
;
}
long
ConsumerConfiguration
::
getUnAckedMessagesTimeoutMs
()
const
{
return
impl_
->
unAckedMessagesTimeoutMs
;
}
void
ConsumerConfiguration
::
setUnAckedMessagesTimeoutMs
(
const
uint64_t
milliSeconds
)
{
if
(
milliSeconds
<
10000
)
{
throw
"Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."
;
}
impl_
->
unAckedMessagesTimeoutMs
=
milliSeconds
;
}
//////////////////////////////////////////////////////
Consumer
::
Consumer
()
Consumer
::
Consumer
()
:
impl_
()
{
:
impl_
()
{
...
...
pulsar-client-cpp/lib/ConsumerConfiguration.cc
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/ConsumerConfigurationImpl.h>
namespace
pulsar
{
ConsumerConfiguration
::
ConsumerConfiguration
()
:
impl_
(
boost
::
make_shared
<
ConsumerConfigurationImpl
>
())
{
}
ConsumerConfiguration
::~
ConsumerConfiguration
()
{
}
ConsumerConfiguration
::
ConsumerConfiguration
(
const
ConsumerConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ConsumerConfiguration
&
ConsumerConfiguration
::
operator
=
(
const
ConsumerConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
long
ConsumerConfiguration
::
getBrokerConsumerStatsCacheTimeInMs
()
const
{
return
impl_
->
brokerConsumerStatsCacheTimeInMs
;
}
void
ConsumerConfiguration
::
setBrokerConsumerStatsCacheTimeInMs
(
const
long
cacheTimeInMs
)
{
impl_
->
brokerConsumerStatsCacheTimeInMs
=
cacheTimeInMs
;
}
ConsumerConfiguration
&
ConsumerConfiguration
::
setConsumerType
(
ConsumerType
consumerType
)
{
impl_
->
consumerType
=
consumerType
;
return
*
this
;
}
ConsumerType
ConsumerConfiguration
::
getConsumerType
()
const
{
return
impl_
->
consumerType
;
}
ConsumerConfiguration
&
ConsumerConfiguration
::
setMessageListener
(
MessageListener
messageListener
)
{
impl_
->
messageListener
=
messageListener
;
impl_
->
hasMessageListener
=
true
;
return
*
this
;
}
MessageListener
ConsumerConfiguration
::
getMessageListener
()
const
{
return
impl_
->
messageListener
;
}
bool
ConsumerConfiguration
::
hasMessageListener
()
const
{
return
impl_
->
hasMessageListener
;
}
void
ConsumerConfiguration
::
setReceiverQueueSize
(
int
size
)
{
impl_
->
receiverQueueSize
=
size
;
}
int
ConsumerConfiguration
::
getReceiverQueueSize
()
const
{
return
impl_
->
receiverQueueSize
;
}
const
std
::
string
&
ConsumerConfiguration
::
getConsumerName
()
const
{
return
impl_
->
consumerName
;
}
void
ConsumerConfiguration
::
setConsumerName
(
const
std
::
string
&
consumerName
)
{
impl_
->
consumerName
=
consumerName
;
}
long
ConsumerConfiguration
::
getUnAckedMessagesTimeoutMs
()
const
{
return
impl_
->
unAckedMessagesTimeoutMs
;
}
void
ConsumerConfiguration
::
setUnAckedMessagesTimeoutMs
(
const
uint64_t
milliSeconds
)
{
if
(
milliSeconds
<
10000
)
{
throw
"Consumer Config Exception: Unacknowledged message timeout should be greater than 10 seconds."
;
}
impl_
->
unAckedMessagesTimeoutMs
=
milliSeconds
;
}
}
pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_CONSUMERCONFIGURATIONIMPL_H_
#define LIB_CONSUMERCONFIGURATIONIMPL_H_
#include <pulsar/ConsumerConfiguration.h>
#include <boost/make_shared.hpp>
namespace
pulsar
{
struct
ConsumerConfigurationImpl
{
long
unAckedMessagesTimeoutMs
;
ConsumerType
consumerType
;
MessageListener
messageListener
;
bool
hasMessageListener
;
int
receiverQueueSize
;
std
::
string
consumerName
;
long
brokerConsumerStatsCacheTimeInMs
;
ConsumerConfigurationImpl
()
:
unAckedMessagesTimeoutMs
(
0
),
consumerType
(
ConsumerExclusive
),
messageListener
(),
hasMessageListener
(
false
),
brokerConsumerStatsCacheTimeInMs
(
30
*
1000
),
// 30 seconds
receiverQueueSize
(
1000
)
{
}
};
}
#endif
/* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
pulsar-client-cpp/lib/Producer.cc
浏览文件 @
d58cc0ba
...
@@ -23,139 +23,7 @@
...
@@ -23,139 +23,7 @@
namespace
pulsar
{
namespace
pulsar
{
const
std
::
string
EMPTY_STRING
;
static
const
std
::
string
EMPTY_STRING
;
struct
ProducerConfiguration
::
Impl
{
int
sendTimeoutMs
;
CompressionType
compressionType
;
int
maxPendingMessages
;
PartitionsRoutingMode
routingMode
;
MessageRoutingPolicyPtr
messageRouter
;
bool
blockIfQueueFull
;
bool
batchingEnabled
;
unsigned
int
batchingMaxMessages
;
unsigned
long
batchingMaxAllowedSizeInBytes
;
unsigned
long
batchingMaxPublishDelayMs
;
Impl
()
:
sendTimeoutMs
(
30000
),
compressionType
(
CompressionNone
),
maxPendingMessages
(
1000
),
routingMode
(
ProducerConfiguration
::
UseSinglePartition
),
blockIfQueueFull
(
true
),
batchingEnabled
(
false
),
batchingMaxMessages
(
1000
),
batchingMaxAllowedSizeInBytes
(
128
*
1024
),
// 128 KB
batchingMaxPublishDelayMs
(
3000
)
{
// 3 seconds
}
};
ProducerConfiguration
::
ProducerConfiguration
()
:
impl_
(
boost
::
make_shared
<
Impl
>
())
{
}
ProducerConfiguration
::~
ProducerConfiguration
()
{
}
ProducerConfiguration
::
ProducerConfiguration
(
const
ProducerConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ProducerConfiguration
&
ProducerConfiguration
::
operator
=
(
const
ProducerConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setSendTimeout
(
int
sendTimeoutMs
)
{
impl_
->
sendTimeoutMs
=
sendTimeoutMs
;
return
*
this
;
}
int
ProducerConfiguration
::
getSendTimeout
()
const
{
return
impl_
->
sendTimeoutMs
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setCompressionType
(
CompressionType
compressionType
)
{
impl_
->
compressionType
=
compressionType
;
return
*
this
;
}
CompressionType
ProducerConfiguration
::
getCompressionType
()
const
{
return
impl_
->
compressionType
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setMaxPendingMessages
(
int
maxPendingMessages
)
{
assert
(
maxPendingMessages
>
0
);
impl_
->
maxPendingMessages
=
maxPendingMessages
;
return
*
this
;
}
int
ProducerConfiguration
::
getMaxPendingMessages
()
const
{
return
impl_
->
maxPendingMessages
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setPartitionsRoutingMode
(
const
PartitionsRoutingMode
&
mode
)
{
impl_
->
routingMode
=
mode
;
return
*
this
;
}
ProducerConfiguration
::
PartitionsRoutingMode
ProducerConfiguration
::
getPartitionsRoutingMode
()
const
{
return
impl_
->
routingMode
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setMessageRouter
(
const
MessageRoutingPolicyPtr
&
router
)
{
impl_
->
routingMode
=
ProducerConfiguration
::
CustomPartition
;
impl_
->
messageRouter
=
router
;
return
*
this
;
}
const
MessageRoutingPolicyPtr
&
ProducerConfiguration
::
getMessageRouterPtr
()
const
{
return
impl_
->
messageRouter
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBlockIfQueueFull
(
bool
flag
)
{
impl_
->
blockIfQueueFull
=
flag
;
return
*
this
;
}
bool
ProducerConfiguration
::
getBlockIfQueueFull
()
const
{
return
impl_
->
blockIfQueueFull
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingEnabled
(
const
bool
&
batchingEnabled
)
{
impl_
->
batchingEnabled
=
batchingEnabled
;
return
*
this
;
}
const
bool
&
ProducerConfiguration
::
getBatchingEnabled
()
const
{
return
impl_
->
batchingEnabled
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxMessages
(
const
unsigned
int
&
batchingMaxMessages
)
{
assert
(
batchingMaxMessages
>
1
);
impl_
->
batchingMaxMessages
=
batchingMaxMessages
;
return
*
this
;
}
const
unsigned
int
&
ProducerConfiguration
::
getBatchingMaxMessages
()
const
{
return
impl_
->
batchingMaxMessages
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxAllowedSizeInBytes
(
const
unsigned
long
&
batchingMaxAllowedSizeInBytes
)
{
impl_
->
batchingMaxAllowedSizeInBytes
=
batchingMaxAllowedSizeInBytes
;
return
*
this
;
}
const
unsigned
long
&
ProducerConfiguration
::
getBatchingMaxAllowedSizeInBytes
()
const
{
return
impl_
->
batchingMaxAllowedSizeInBytes
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxPublishDelayMs
(
const
unsigned
long
&
batchingMaxPublishDelayMs
)
{
impl_
->
batchingMaxPublishDelayMs
=
batchingMaxPublishDelayMs
;
return
*
this
;
}
const
unsigned
long
&
ProducerConfiguration
::
getBatchingMaxPublishDelayMs
()
const
{
return
impl_
->
batchingMaxPublishDelayMs
;
}
////////////////////////////////////////////////////////////////////////////////
Producer
::
Producer
()
Producer
::
Producer
()
:
impl_
()
{
:
impl_
()
{
...
...
pulsar-client-cpp/lib/ProducerConfiguration.cc
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <lib/ProducerConfigurationImpl.h>
namespace
pulsar
{
ProducerConfiguration
::
ProducerConfiguration
()
:
impl_
(
boost
::
make_shared
<
ProducerConfigurationImpl
>
())
{
}
ProducerConfiguration
::~
ProducerConfiguration
()
{
}
ProducerConfiguration
::
ProducerConfiguration
(
const
ProducerConfiguration
&
x
)
:
impl_
(
x
.
impl_
)
{
}
ProducerConfiguration
&
ProducerConfiguration
::
operator
=
(
const
ProducerConfiguration
&
x
)
{
impl_
=
x
.
impl_
;
return
*
this
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setSendTimeout
(
int
sendTimeoutMs
)
{
impl_
->
sendTimeoutMs
=
sendTimeoutMs
;
return
*
this
;
}
int
ProducerConfiguration
::
getSendTimeout
()
const
{
return
impl_
->
sendTimeoutMs
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setCompressionType
(
CompressionType
compressionType
)
{
impl_
->
compressionType
=
compressionType
;
return
*
this
;
}
CompressionType
ProducerConfiguration
::
getCompressionType
()
const
{
return
impl_
->
compressionType
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setMaxPendingMessages
(
int
maxPendingMessages
)
{
assert
(
maxPendingMessages
>
0
);
impl_
->
maxPendingMessages
=
maxPendingMessages
;
return
*
this
;
}
int
ProducerConfiguration
::
getMaxPendingMessages
()
const
{
return
impl_
->
maxPendingMessages
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setPartitionsRoutingMode
(
const
PartitionsRoutingMode
&
mode
)
{
impl_
->
routingMode
=
mode
;
return
*
this
;
}
ProducerConfiguration
::
PartitionsRoutingMode
ProducerConfiguration
::
getPartitionsRoutingMode
()
const
{
return
impl_
->
routingMode
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setMessageRouter
(
const
MessageRoutingPolicyPtr
&
router
)
{
impl_
->
routingMode
=
ProducerConfiguration
::
CustomPartition
;
impl_
->
messageRouter
=
router
;
return
*
this
;
}
const
MessageRoutingPolicyPtr
&
ProducerConfiguration
::
getMessageRouterPtr
()
const
{
return
impl_
->
messageRouter
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBlockIfQueueFull
(
bool
flag
)
{
impl_
->
blockIfQueueFull
=
flag
;
return
*
this
;
}
bool
ProducerConfiguration
::
getBlockIfQueueFull
()
const
{
return
impl_
->
blockIfQueueFull
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingEnabled
(
const
bool
&
batchingEnabled
)
{
impl_
->
batchingEnabled
=
batchingEnabled
;
return
*
this
;
}
const
bool
&
ProducerConfiguration
::
getBatchingEnabled
()
const
{
return
impl_
->
batchingEnabled
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxMessages
(
const
unsigned
int
&
batchingMaxMessages
)
{
assert
(
batchingMaxMessages
>
1
);
impl_
->
batchingMaxMessages
=
batchingMaxMessages
;
return
*
this
;
}
const
unsigned
int
&
ProducerConfiguration
::
getBatchingMaxMessages
()
const
{
return
impl_
->
batchingMaxMessages
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxAllowedSizeInBytes
(
const
unsigned
long
&
batchingMaxAllowedSizeInBytes
)
{
impl_
->
batchingMaxAllowedSizeInBytes
=
batchingMaxAllowedSizeInBytes
;
return
*
this
;
}
const
unsigned
long
&
ProducerConfiguration
::
getBatchingMaxAllowedSizeInBytes
()
const
{
return
impl_
->
batchingMaxAllowedSizeInBytes
;
}
ProducerConfiguration
&
ProducerConfiguration
::
setBatchingMaxPublishDelayMs
(
const
unsigned
long
&
batchingMaxPublishDelayMs
)
{
impl_
->
batchingMaxPublishDelayMs
=
batchingMaxPublishDelayMs
;
return
*
this
;
}
const
unsigned
long
&
ProducerConfiguration
::
getBatchingMaxPublishDelayMs
()
const
{
return
impl_
->
batchingMaxPublishDelayMs
;
}
}
pulsar-client-cpp/lib/ProducerConfigurationImpl.h
0 → 100644
浏览文件 @
d58cc0ba
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIB_PRODUCERCONFIGURATIONIMPL_H_
#define LIB_PRODUCERCONFIGURATIONIMPL_H_
#include <pulsar/ProducerConfiguration.h>
#include <boost/make_shared.hpp>
namespace
pulsar
{
struct
ProducerConfigurationImpl
{
int
sendTimeoutMs
;
CompressionType
compressionType
;
int
maxPendingMessages
;
ProducerConfiguration
::
PartitionsRoutingMode
routingMode
;
MessageRoutingPolicyPtr
messageRouter
;
bool
blockIfQueueFull
;
bool
batchingEnabled
;
unsigned
int
batchingMaxMessages
;
unsigned
long
batchingMaxAllowedSizeInBytes
;
unsigned
long
batchingMaxPublishDelayMs
;
ProducerConfigurationImpl
()
:
sendTimeoutMs
(
30000
),
compressionType
(
CompressionNone
),
maxPendingMessages
(
1000
),
routingMode
(
ProducerConfiguration
::
UseSinglePartition
),
blockIfQueueFull
(
true
),
batchingEnabled
(
false
),
batchingMaxMessages
(
1000
),
batchingMaxAllowedSizeInBytes
(
128
*
1024
),
// 128 KB
batchingMaxPublishDelayMs
(
3000
)
{
// 3 seconds
}
};
}
#endif
/* LIB_PRODUCERCONFIGURATIONIMPL_H_ */
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录