diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java index 35f8660e97bcaab20854d3cc3d8fe0d1fbeb1d7a..9d26e9982b92154177c2e9fab1697f1d5a162fe4 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java @@ -22,7 +22,8 @@ import java.net.SocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageHook; @@ -60,7 +61,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc protected final static int DLQ_NUMS_PER_GROUP = 1; protected final BrokerController brokerController; - protected final Random random = new Random(System.currentTimeMillis()); protected final SocketAddress storeHost; private List sendMessageHookList; @@ -109,7 +109,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { int queueIdInt = requestHeader.getQueueId(); if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums(); } int sysFlag = requestHeader.getSysFlag(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java index 2890fc4db753d922bc2ba7520e6f2f0217ae4f5c..f31576fe37a014d4f168dcfaeb5c1a457754ecf6 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java @@ -45,6 +45,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.stats.BrokerStatsManager; +import java.util.concurrent.ThreadLocalRandom; + public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); @@ -125,7 +127,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java index 2cd142fb3761095af21d1d0c08b3c58afc8edd27..6942c88d870812a4d49e5574d0a93ad6c3d3eb99 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ThreadLocalRandom; import io.netty.channel.ChannelHandlerContext; import org.apache.rocketmq.broker.BrokerController; @@ -141,7 +142,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); - int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); + int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums(); int topicSysFlag = 0; if (requestHeader.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); @@ -188,7 +189,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, @@ -353,7 +354,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); if (reconsumeTimes >= maxReconsumeTimes) { newTopic = MixAll.getDLQTopic(groupName); - int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; + int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0 @@ -410,7 +411,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (queueIdInt < 0) { - queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); + queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums(); } MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); @@ -666,7 +667,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement } private int randomQueueId(int writeQueueNums) { - return (this.random.nextInt() % 99999999) % writeQueueNums; + return ThreadLocalRandom.current().nextInt(99999999) % writeQueueNums; } private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java index 4cf5647084da6f537dd60769ee980805acb7cffa..0079fb5ce44e955587d20fabe1439d9f6c92b009 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.broker.transaction; import io.netty.channel.Channel; -import java.util.Random; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.message.MessageConst; @@ -40,7 +39,6 @@ public abstract class AbstractTransactionalMessageCheckListener { //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC protected final static int TCMT_QUEUE_NUMS = 1; - protected final Random random = new Random(System.currentTimeMillis()); private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue(2000), new ThreadFactory() { @Override diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java index ee87bd375300ea0fb80c4315ddaaffd78463250d..a28e33249509a88128fa1b1120366c4bd30de320 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/DefaultTransactionalMessageCheckListener.java @@ -30,6 +30,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageStatus; +import java.util.concurrent.ThreadLocalRandom; + public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); @@ -58,7 +60,7 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE); - int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; + int queueId = ThreadLocalRandom.current().nextInt(99999999) % TCMT_QUEUE_NUMS; MessageExtBrokerInner inner = new MessageExtBrokerInner(); inner.setTopic(topicConfig.getTopicName()); inner.setBody(msgExt.getBody());