未验证 提交 bf343a31 编写于 作者: R racoon 提交者: GitHub

[ISSUE #3225]change Random to ThreadLocalRandom in broker

上级 83b04d03
...@@ -22,7 +22,8 @@ import java.net.SocketAddress; ...@@ -22,7 +22,8 @@ import java.net.SocketAddress;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.concurrent.ThreadLocalRandom;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext; import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook; import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
...@@ -60,7 +61,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc ...@@ -60,7 +61,6 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
protected final static int DLQ_NUMS_PER_GROUP = 1; protected final static int DLQ_NUMS_PER_GROUP = 1;
protected final BrokerController brokerController; protected final BrokerController brokerController;
protected final Random random = new Random(System.currentTimeMillis());
protected final SocketAddress storeHost; protected final SocketAddress storeHost;
private List<SendMessageHook> sendMessageHookList; private List<SendMessageHook> sendMessageHookList;
...@@ -109,7 +109,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc ...@@ -109,7 +109,7 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) { final SendMessageRequestHeader requestHeader, final byte[] body, TopicConfig topicConfig) {
int queueIdInt = requestHeader.getQueueId(); int queueIdInt = requestHeader.getQueueId();
if (queueIdInt < 0) { if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
} }
int sysFlag = requestHeader.getSysFlag(); int sysFlag = requestHeader.getSysFlag();
......
...@@ -45,6 +45,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; ...@@ -45,6 +45,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.stats.BrokerStatsManager; import org.apache.rocketmq.store.stats.BrokerStatsManager;
import java.util.concurrent.ThreadLocalRandom;
public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor { public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
...@@ -125,7 +127,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen ...@@ -125,7 +127,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor implemen
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) { if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
} }
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
......
...@@ -21,6 +21,7 @@ import java.util.List; ...@@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
...@@ -141,7 +142,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -141,7 +142,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % subscriptionGroupConfig.getRetryQueueNums();
int topicSysFlag = 0; int topicSysFlag = 0;
if (requestHeader.isUnitMode()) { if (requestHeader.isUnitMode()) {
topicSysFlag = TopicSysFlag.buildSysFlag(false, true); topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
...@@ -188,7 +189,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -188,7 +189,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
|| delayLevel < 0) { || delayLevel < 0) {
newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP, DLQ_NUMS_PER_GROUP,
...@@ -353,7 +354,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -353,7 +354,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
if (reconsumeTimes >= maxReconsumeTimes) { if (reconsumeTimes >= maxReconsumeTimes) {
newTopic = MixAll.getDLQTopic(groupName); newTopic = MixAll.getDLQTopic(groupName);
int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; int queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % DLQ_NUMS_PER_GROUP;
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP, DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0 PermName.PERM_WRITE, 0
...@@ -410,7 +411,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -410,7 +411,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0) { if (queueIdInt < 0) {
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums(); queueIdInt = ThreadLocalRandom.current().nextInt(99999999) % topicConfig.getWriteQueueNums();
} }
MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
...@@ -666,7 +667,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement ...@@ -666,7 +667,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
} }
private int randomQueueId(int writeQueueNums) { private int randomQueueId(int writeQueueNums) {
return (this.random.nextInt() % 99999999) % writeQueueNums; return ThreadLocalRandom.current().nextInt(99999999) % writeQueueNums;
} }
private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request, private RemotingCommand preSend(ChannelHandlerContext ctx, RemotingCommand request,
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.broker.transaction; package org.apache.rocketmq.broker.transaction;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import java.util.Random;
import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst; import org.apache.rocketmq.common.message.MessageConst;
...@@ -40,7 +39,6 @@ public abstract class AbstractTransactionalMessageCheckListener { ...@@ -40,7 +39,6 @@ public abstract class AbstractTransactionalMessageCheckListener {
//queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC //queue nums of topic TRANS_CHECK_MAX_TIME_TOPIC
protected final static int TCMT_QUEUE_NUMS = 1; protected final static int TCMT_QUEUE_NUMS = 1;
protected final Random random = new Random(System.currentTimeMillis());
private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { private static ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override @Override
......
...@@ -30,6 +30,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner; ...@@ -30,6 +30,8 @@ import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.PutMessageResult; import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus; import org.apache.rocketmq.store.PutMessageStatus;
import java.util.concurrent.ThreadLocalRandom;
public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener { public class DefaultTransactionalMessageCheckListener extends AbstractTransactionalMessageCheckListener {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.TRANSACTION_LOGGER_NAME);
...@@ -58,7 +60,7 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio ...@@ -58,7 +60,7 @@ public class DefaultTransactionalMessageCheckListener extends AbstractTransactio
private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) { private MessageExtBrokerInner toMessageExtBrokerInner(MessageExt msgExt) {
TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE); TopicConfig topicConfig = this.getBrokerController().getTopicConfigManager().createTopicOfTranCheckMaxTime(TCMT_QUEUE_NUMS, PermName.PERM_READ | PermName.PERM_WRITE);
int queueId = Math.abs(random.nextInt() % 99999999) % TCMT_QUEUE_NUMS; int queueId = ThreadLocalRandom.current().nextInt(99999999) % TCMT_QUEUE_NUMS;
MessageExtBrokerInner inner = new MessageExtBrokerInner(); MessageExtBrokerInner inner = new MessageExtBrokerInner();
inner.setTopic(topicConfig.getTopicName()); inner.setTopic(topicConfig.getTopicName());
inner.setBody(msgExt.getBody()); inner.setBody(msgExt.getBody());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册