From 0c022e05af86fa53c71bcb92b93a8a2b6fb82908 Mon Sep 17 00:00:00 2001 From: yukon Date: Tue, 27 Dec 2016 16:38:25 +0800 Subject: [PATCH] Allow setting base factor for commercial data. --- .../rocketmq/broker/processor/PullMessageProcessor.java | 4 +++- .../rocketmq/broker/processor/SendMessageProcessor.java | 3 ++- .../java/com/alibaba/rocketmq/common/BrokerConfig.java | 9 +++++++++ 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java index 0152b93a..1257f183 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/PullMessageProcessor.java @@ -299,9 +299,11 @@ public class PullMessageProcessor implements NettyRequestProcessor { switch (response.getCode()) { case ResponseCode.SUCCESS: + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); + int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount; context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS); - context.setCommercialRcvTimes(getMessageResult.getMsgCount4Commercial()); + context.setCommercialRcvTimes(incValue); context.setCommercialRcvSize(getMessageResult.getBufferTotalSize()); context.setCommercialOwner(owner); diff --git a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java index 414b3f43..a3752854 100644 --- a/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java +++ b/rocketmq-broker/src/main/java/com/alibaba/rocketmq/broker/processor/SendMessageProcessor.java @@ -428,8 +428,9 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement sendMessageContext.setQueueId(responseHeader.getQueueId()); sendMessageContext.setQueueOffset(responseHeader.getQueueOffset()); + int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount(); int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes(); - int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT); + int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount; sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS); sendMessageContext.setCommercialSendTimes(incValue); diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java index 6eae0a74..ba80a3f6 100644 --- a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java +++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfig.java @@ -85,6 +85,7 @@ public class BrokerConfig { private int commercialTimerCount = 1; private int commercialTransCount = 1; private int commercialBigCount = 1; + private int commercialBaseCount = 1; private boolean transferMsgByHeap = true; private int maxDelayTime = 40; @@ -537,4 +538,12 @@ public class BrokerConfig { public void setConsumerManageThreadPoolNums(int consumerManageThreadPoolNums) { this.consumerManageThreadPoolNums = consumerManageThreadPoolNums; } + + public int getCommercialBaseCount() { + return commercialBaseCount; + } + + public void setCommercialBaseCount(int commercialBaseCount) { + this.commercialBaseCount = commercialBaseCount; + } } -- GitLab