diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 9ece9ea6697c880d24075c7f6e0f5d7772a8978a..603f4e149c274b42088161d23a6030785871f7f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -372,12 +372,18 @@ public class ProducerImpl extends ProducerBase implements TimerTask, Conne ByteBuf encryptedPayload = encryptMessage(msgMetadataBuilder, compressedPayload); MessageMetadata msgMetadata = msgMetadataBuilder.build(); - ByteBufPair cmd = sendMessage(producerId, sequenceId, 1, msgMetadata, encryptedPayload); + + // When publishing during replication, we need to set the correct number of message in batch + // This is only used in tracking the publish rate stats + int numMessages = msg.getMessageBuilder().hasNumMessagesInBatch() + ? msg.getMessageBuilder().getNumMessagesInBatch() + : 1; + ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msgMetadata, encryptedPayload); msgMetadataBuilder.recycle(); msgMetadata.recycle(); final OpSendMsg op = OpSendMsg.create(msg, cmd, sequenceId, callback); - op.setNumMessagesInBatch(1); + op.setNumMessagesInBatch(numMessages); op.setBatchSizeByte(encryptedPayload.readableBytes()); pendingMessages.put(op); lastSendFuture = callback.getFuture();