未验证 提交 70337c71 编写于 作者: K k2la 提交者: guangning

[pulsar-client-cpp] Fix Redelivery of Messages on UnackedMessageTracker When Ack Messages . (#6498)

### Motivation
Because of #6391 , acked messages were counted as unacked messages. 
Although messages from brokers were acknowledged, the following log was output.

```
2020-03-06 19:44:51.790 INFO  ConsumerImpl:174 | [persistent://public/default/t1, sub1, 0] Created consumer on broker [127.0.0.1:58860 -> 127.0.0.1:6650]
my-message-0: Fri Mar  6 19:45:05 2020
my-message-1: Fri Mar  6 19:45:05 2020
my-message-2: Fri Mar  6 19:45:05 2020
2020-03-06 19:45:15.818 INFO  UnAckedMessageTrackerEnabled:53 | [persistent://public/default/t1, sub1, 0] : 3 Messages were not acked within 10000 time

```

This behavior happened on master branch.

(cherry picked from commit 67f8cf30)
上级 319eabb8
......@@ -90,8 +90,10 @@ UnAckedMessageTrackerEnabled::UnAckedMessageTrackerEnabled(long timeoutMs, long
bool UnAckedMessageTrackerEnabled::add(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
if (messageIdPartitionMap.count(m) == 0) {
bool insert = messageIdPartitionMap.insert(std::make_pair(m, timePartitions.back())).second;
return insert && timePartitions.back().insert(m).second;
std::set<MessageId>& partition = timePartitions.back();
bool emplace = messageIdPartitionMap.emplace(m, partition).second;
bool insert = partition.insert(m).second;
return emplace && insert;
}
return false;
}
......@@ -104,7 +106,8 @@ bool UnAckedMessageTrackerEnabled::isEmpty() {
bool UnAckedMessageTrackerEnabled::remove(const MessageId& m) {
std::lock_guard<std::mutex> acquire(lock_);
bool removed = false;
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(m);
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(m);
if (exist != messageIdPartitionMap.end()) {
removed = exist->second.erase(m);
}
......@@ -121,7 +124,7 @@ void UnAckedMessageTrackerEnabled::removeMessagesTill(const MessageId& msgId) {
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap < msgId) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgId);
std::map<MessageId, std::set<MessageId>&>::iterator exist = messageIdPartitionMap.find(msgId);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgId);
}
......@@ -135,7 +138,8 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
for (auto it = messageIdPartitionMap.begin(); it != messageIdPartitionMap.end(); it++) {
MessageId msgIdInMap = it->first;
if (msgIdInMap.getTopicName().compare(topic) == 0) {
std::map<MessageId, std::set<MessageId>>::iterator exist = messageIdPartitionMap.find(msgIdInMap);
std::map<MessageId, std::set<MessageId>&>::iterator exist =
messageIdPartitionMap.find(msgIdInMap);
if (exist != messageIdPartitionMap.end()) {
exist->second.erase(msgIdInMap);
}
......
......@@ -23,7 +23,6 @@
#include <mutex>
namespace pulsar {
class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
public:
~UnAckedMessageTrackerEnabled();
......@@ -41,7 +40,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
void timeoutHandlerHelper();
bool isEmpty();
long size();
std::map<MessageId, std::set<MessageId>> messageIdPartitionMap;
std::map<MessageId, std::set<MessageId>&> messageIdPartitionMap;
std::deque<std::set<MessageId>> timePartitions;
std::mutex lock_;
DeadlineTimerPtr timer_;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册