diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index b2e7848bbf4caaf1900f618832899d5321c5cae2..bf9e3ac6e4f42482bc3280dd714da9dc4676268b 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -232,6 +232,14 @@ void ProducerImpl::failPendingMessages(Result result) { // without holding producer mutex. for (MessageQueue::const_iterator it = pendingMessagesQueue_.begin(); it != pendingMessagesQueue_.end(); it++) { + // When dealing any failure message, if the current message is a batch one, we should also release + // the reserved spots in the pendingMessageQueue_, for all individual messages inside this batch + // message. See 'ProducerImpl::sendAsync' for more details. + if (it->msg_.impl_->metadata.has_num_messages_in_batch()) { + // batch message - need to release more spots + // -1 since the pushing batch message into the queue already released a spot + pendingMessagesQueue_.release(it->msg_.impl_->metadata.num_messages_in_batch() - 1); + } messagesToFail.push_back(*it); }