提交 e88fed4a 编写于 作者: M Matteo Merli

Use correct number of messages in batch for publish rate stats during replication (#3834)

上级 c6aa56fe
......@@ -372,12 +372,18 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload);
MessageMetadata msgMetadata = msgMetadataBuilder.build();
ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload);
// When publishing during replication, we need to set the correct number of message in batch
// This is only used in tracking the publish rate stats
int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch()
? msg.getMessageBuilder().getNumMessagesInBatch()
: 1;
ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
msgMetadataBuilder.recycle();
msgMetadata.recycle();
final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback);
op.setNumMessagesInBatch(1);
op.setNumMessagesInBatch(numMessages);
op.setBatchSizeByte(encryptedPayload.readableBytes());
pendingMessages.put(op);
lastSendFuture = callback.getFuture();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册