diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java index 0152b93a812ed25daa15af2a2b54456f6db70091..1257f1837292550276b994a908c0cf703de100c7 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java @@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { switch (response.getCode()) { case ResponseCode.SUCCESS: + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); - context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial()); + context.setCommercialRcvTimes(incValue); context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); context.setCommercialOwner(owner); diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java index 414b3f43fb175daba3094e145d1bd14342d1227f..a3752854049ecbf0b231579745ba0ecf0847a8c3 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java @@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java index 6eae0a74295fe4fec02cdcb1f94f9162bd18878f..ba80a3f6a974ca69c8a348ff8de6669070b0a4a3 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java @@ -85,6 +85,7 @@ public class BrokerConfig { private int commercialTimerCount = 1; private int commercialTransCount = 1; private int commercialBigCount = 1; + private int commercialBaseCount = 1; private boolean transferMsgByHeap = true; private int maxDelayTime = 40; @@ -537,4 +538,12 @@ public class BrokerConfig { public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) { this.consumerManageThreadPoolNums = consumerManageThreadPoolNums; } + + public int getCommercialBaseCount() { + return commercialBaseCount; + } + + public void setCommercialBaseCount(int commercialBaseCount) { + this.commercialBaseCount = commercialBaseCount; + } }