未验证 提交 97f41120 编写于 作者: Y Yunze Xu 提交者: GitHub

[issue 7851][C++] Make clear() thread-safe (#7862)

Fixes #7851 

### Motivation

`clear()` methods of `BatchAcknowledgementTracker` and `UnAckedMessageTrackerEnabled` are not thread-safe.

### Modifications

Acquire a mutex in these `clear()` methods.
上级 cc89e0c1
......@@ -33,6 +33,7 @@ BatchAcknowledgementTracker::BatchAcknowledgementTracker(const std::string topic
}
void BatchAcknowledgementTracker::clear() {
Lock lock(mutex_);
trackerMap_.clear();
sendList_.clear();
}
......
......@@ -39,7 +39,7 @@ void UnAckedMessageTrackerEnabled::timeoutHandler() {
}
void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
std::lock_guard<std::mutex> acquire(lock_);
std::unique_lock<std::mutex> acquire(lock_);
LOG_DEBUG("UnAckedMessageTrackerEnabled::timeoutHandlerHelper invoked for consumerPtr_ "
<< consumerReference_.getName().c_str());
......@@ -60,6 +60,9 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {
timePartitions.push_back(headPartition);
if (msgIdsToRedeliver.size() > 0) {
// redeliverUnacknowledgedMessages() may call clear() that acquire the lock again, so we should unlock
// here to avoid deadlock
acquire.unlock();
consumerReference_.redeliverUnacknowledgedMessages(msgIdsToRedeliver);
}
}
......@@ -148,6 +151,7 @@ void UnAckedMessageTrackerEnabled::removeTopicMessage(const std::string& topic)
}
void UnAckedMessageTrackerEnabled::clear() {
std::lock_guard<std::mutex> acquire(lock_);
messageIdPartitionMap.clear();
for (auto it = timePartitions.begin(); it != timePartitions.end(); it++) {
it->clear();
......
......@@ -1750,6 +1750,8 @@ TEST(BasicEndToEndTest, testPartitionTopicUnAckedMessageTimeout) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
timeWaited += 500;
}
client.close();
}
TEST(BasicEndToEndTest, testUnAckedMessageTimeoutListener) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册