提交 37c093e0 编写于 作者: M Matteo Merli 提交者: xiaolong.ran

[Cpp] Fixed negative ack tracker constructor sequence (#5453)

* [Cpp] Fixed negative ack tracker constructor sequence

* Fixed formatting

* Fixed default for neg ack delay on partitioned topics

* Fixed format

(cherry picked from commit 6f113f73)
上级 f08c9362
......@@ -54,6 +54,12 @@ class PULSAR_PUBLIC ConsumerConfiguration {
ConsumerConfiguration(const ConsumerConfiguration&);
ConsumerConfiguration& operator=(const ConsumerConfiguration&);
/**
* Create a new instance of ConsumerConfiguration with the same
* initial settings as the current one.
*/
ConsumerConfiguration clone() const;
/**
* Declare the schema of the data that this consumer will be accepting.
*
......
......@@ -33,6 +33,12 @@ ConsumerConfiguration& ConsumerConfiguration::operator=(const ConsumerConfigurat
return *this;
}
ConsumerConfiguration ConsumerConfiguration::clone() const {
ConsumerConfiguration newConf;
newConf.impl_.reset(new ConsumerConfigurationImpl(*this->impl_));
return newConf;
}
ConsumerConfiguration& ConsumerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
impl_->schemaInfo = schemaInfo;
return *this;
......@@ -93,11 +99,11 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
}
void ConsumerConfiguration::setNegativeAckRedeliveryDelayMs(long redeliveryDelayMillis) {
impl_->negativeAckRedeliveryDelay = std::chrono::milliseconds(redeliveryDelayMillis);
impl_->negativeAckRedeliveryDelayMs = redeliveryDelayMillis;
}
long ConsumerConfiguration::getNegativeAckRedeliveryDelayMs() const {
return impl_->negativeAckRedeliveryDelay.count();
return impl_->negativeAckRedeliveryDelayMs;
}
bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); }
......
......@@ -28,7 +28,7 @@ struct ConsumerConfigurationImpl {
SchemaInfo schemaInfo;
long unAckedMessagesTimeoutMs;
std::chrono::milliseconds negativeAckRedeliveryDelay;
long negativeAckRedeliveryDelayMs;
ConsumerType consumerType;
MessageListener messageListener;
bool hasMessageListener;
......@@ -45,6 +45,7 @@ struct ConsumerConfigurationImpl {
ConsumerConfigurationImpl()
: schemaInfo(),
unAckedMessagesTimeoutMs(0),
negativeAckRedeliveryDelayMs(60000),
consumerType(ConsumerExclusive),
messageListener(),
hasMessageListener(false),
......
......@@ -24,16 +24,22 @@
#include <set>
#include <functional>
#include "LogUtils.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
const std::chrono::milliseconds NegativeAcksTracker::MIN_NACK_DELAY_NANOS = std::chrono::milliseconds(100);
NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &consumer,
const ConsumerConfiguration &conf)
: consumer_(consumer),
nackDelay_(
std::max(std::chrono::milliseconds(conf.getNegativeAckRedeliveryDelayMs()), MIN_NACK_DELAY_NANOS)),
timerInterval_((long)(nackDelay_.count() / 3)),
executor_(client->getIOExecutorProvider()->get()) {}
: consumer_(consumer), timerInterval_(0), executor_(client->getIOExecutorProvider()->get()) {
static const long MIN_NACK_DELAY_MILLIS = 100;
nackDelay_ =
std::chrono::milliseconds(std::max(conf.getNegativeAckRedeliveryDelayMs(), MIN_NACK_DELAY_MILLIS));
timerInterval_ = boost::posix_time::milliseconds((long)(nackDelay_.count() / 3));
LOG_DEBUG("Created negative ack tracker with delay: " << nackDelay_.count()
<< " ms - Timer interval: " << timerInterval_);
}
void NegativeAcksTracker::scheduleTimer() {
timer_ = executor_->createDeadlineTimer();
......
......@@ -45,8 +45,6 @@ class NegativeAcksTracker {
void scheduleTimer();
void handleTimer(const boost::system::error_code &ec);
static const std::chrono::milliseconds MIN_NACK_DELAY_NANOS;
ConsumerImpl &consumer_;
std::mutex mutex_;
......
......@@ -192,7 +192,7 @@ void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
void PartitionedConsumerImpl::start() {
ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
std::shared_ptr<ConsumerImpl> consumer;
ConsumerConfiguration config;
ConsumerConfiguration config = conf_.clone();
// all the partitioned-consumer belonging to one partitioned topic should have same name
config.setConsumerName(conf_.getConsumerName());
config.setConsumerType(conf_.getConsumerType());
......
......@@ -2867,7 +2867,7 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
Consumer consumer;
ConsumerConfiguration conf;
conf.setNegativeAckRedeliveryDelayMs(100);
Result result = client.subscribe(topic, "test", consumer);
Result result = client.subscribe(topic, "test", conf, consumer);
ASSERT_EQ(ResultOk, result);
Producer producer;
......@@ -2887,6 +2887,7 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
Message msg;
consumer.receive(msg);
LOG_INFO("Received message " << msg.getDataAsString());
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
consumer.negativeAcknowledge(msg);
}
......@@ -2894,6 +2895,7 @@ void testNegativeAcks(const std::string &topic, bool batchingEnabled) {
for (int i = 0; i < 10; i++) {
Message msg;
consumer.receive(msg);
LOG_INFO("-- Redelivery -- Received message " << msg.getDataAsString());
ASSERT_EQ(msg.getDataAsString(), "test-" + std::to_string(i));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册