From cde6f404414b5aff72a88c0d80f6464068e8ce75 Mon Sep 17 00:00:00 2001 From: huzongtang Date: Tue, 11 Dec 2018 16:03:53 +0800 Subject: [PATCH] [ISSUE #525] Support the message track,these committed codes have supported trace_topic name value Configurable by users. --- .../broker/topic/TopicConfigManager.java | 2 +- .../consumer/DefaultMQPushConsumer.java | 13 +++++++++--- .../client/producer/DefaultMQProducer.java | 16 ++++++++++---- .../core/common/TrackTraceConstants.java | 4 +--- .../dispatch/impl/AsyncArrayDispatcher.java | 21 ++++++++++++------- .../core/hook/SendMessageTrackHookImpl.java | 7 +++---- .../trace/DefaultMQConsumerWithTraceTest.java | 10 +++++---- .../trace/DefaultMQProducerWithTraceTest.java | 10 ++++++--- .../apache/rocketmq/common/BrokerConfig.java | 10 +++++++++ .../example/tracemessage/TraceProducer.java | 2 +- .../tracemessage/TracePushConsumer.java | 3 ++- .../logappender/common/ProducerInstance.java | 2 +- .../logappender/AbstractTestCase.java | 2 +- .../test/client/rmq/RMQNormalConsumer.java | 2 +- .../test/factory/ProducerFactory.java | 2 +- .../message/QueryMsgByIdSubCommand.java | 2 +- .../tools/monitor/MonitorService.java | 2 +- 17 files changed, 73 insertions(+), 37 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 163897b1..bd5eaeea 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -126,7 +126,7 @@ public class TopicConfigManager extends ConfigManager { } { if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { - String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + String topic = this.brokerController.getBrokerConfig().getMsgTrackTopicName(); TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(1); diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 0b5ce052..179a80da 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -289,9 +289,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. * @param msgTraceSwitch switch flag instance for message track trace. + * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, - AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) { + AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch, final String traceTopicName) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); @@ -303,6 +304,11 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.CONSUMER.name()); + if (!UtilAll.isBlank(traceTopicName)) { + tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); + } else { + tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); + } AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl()); traceDispatcher = dispatcher; @@ -329,9 +335,10 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * * @param consumerGroup Consumer group. * @param msgTraceSwitch switch flag instance for message track trace. + * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) { - this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch); + public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch, final String traceTopicName) { + this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch, traceTopicName); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index 8bee795d..3c33d2ee 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -33,6 +33,7 @@ import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import org.apache.rocketmq.client.trace.core.hook.SendMessageTrackHookImpl; import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageBatch; import org.apache.rocketmq.common.message.MessageClientIDSetter; @@ -138,7 +139,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * Default constructor. */ public DefaultMQProducer() { - this(MixAll.DEFAULT_PRODUCER_GROUP, null,false); + this(MixAll.DEFAULT_PRODUCER_GROUP, null); } /** @@ -158,8 +159,9 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. * @param msgTraceSwitch switch flag instance for message track trace. + * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) { + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch,final String traceTopicName) { this.producerGroup = producerGroup; defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); //if client open the message track trace feature @@ -171,6 +173,11 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { tempProperties.put(TrackTraceConstants.MAX_BATCH_NUM, "100"); tempProperties.put(TrackTraceConstants.INSTANCE_NAME, "PID_CLIENT_INNER_TRACE_PRODUCER"); tempProperties.put(TrackTraceConstants.TRACE_DISPATCHER_TYPE, TrackTraceDispatcherType.PRODUCER.name()); + if (!UtilAll.isBlank(traceTopicName)) { + tempProperties.put(TrackTraceConstants.TRACE_TOPIC, traceTopicName); + } else { + tempProperties.put(TrackTraceConstants.TRACE_TOPIC, MixAll.RMQ_SYS_TRACK_TRACE_TOPIC); + } AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties); dispatcher.setHostProducer(this.getDefaultMQProducerImpl()); traceDispatcher = dispatcher; @@ -197,9 +204,10 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. * @param msgTraceSwitch switch flag instance for message track trace. + * @param traceTopicName the name value of message track trace topic.If you don't config,you can use the default trace topic name. */ - public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) { - this(producerGroup, null, msgTraceSwitch); + public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch, final String traceTopicName) { + this(producerGroup, null, msgTraceSwitch, traceTopicName); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java index aa49d1c5..a8868614 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceConstants.java @@ -16,8 +16,6 @@ */ package org.apache.rocketmq.client.trace.core.common; -import org.apache.rocketmq.common.MixAll; - public class TrackTraceConstants { public static final String NAMESRV_ADDR = "NAMESRV_ADDR"; public static final String ADDRSRV_URL = "ADDRSRV_URL"; @@ -27,7 +25,7 @@ public class TrackTraceConstants { public static final String WAKE_UP_NUM = "WakeUpNum"; public static final String MAX_MSG_SIZE = "MaxMsgSize"; public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER"; - public static final String TRACE_TOPIC = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; + public static final String TRACE_TOPIC = "TRACK_TRACE_TOPIC_NAME"; public static final char CONTENT_SPLITOR = (char) 1; public static final char FIELD_SPLITOR = (char) 2; public static final String TRACE_DISPATCHER_TYPE = "DispatcherType"; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java index 1c1c4c38..90b00d41 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/dispatch/impl/AsyncArrayDispatcher.java @@ -72,6 +72,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { private DefaultMQPushConsumerImpl hostConsumer; private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); private String dispatcherId = UUID.randomUUID().toString(); + private String traceTopicName; public AsyncArrayDispatcher(Properties properties) throws MQClientException { dispatcherType = properties.getProperty(TrackTraceConstants.TRACE_DISPATCHER_TYPE); @@ -83,7 +84,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { this.discardCount = new AtomicLong(0L); traceContextQueue = new ArrayBlockingQueue(1024); appenderQueue = new ArrayBlockingQueue(queueSize); - + traceTopicName = properties.getProperty(TrackTraceConstants.TRACE_TOPIC); this.traceExecuter = new ThreadPoolExecutor(// 10, // 20, // @@ -94,6 +95,14 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { traceProducer = TrackTraceProducerFactory.getTraceDispatcherProducer(properties); } + public String getTraceTopicName() { + return traceTopicName; + } + + public void setTraceTopicName(String traceTopicName) { + this.traceTopicName = traceTopicName; + } + public DefaultMQProducer getTraceProducer() { return traceProducer; } @@ -115,7 +124,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { } public void start(Properties properties) throws MQClientException { - TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId,properties.getProperty(TrackTraceConstants.NAMESRV_ADDR)); + TrackTraceProducerFactory.registerTraceDispatcher(dispatcherId, properties.getProperty(TrackTraceConstants.NAMESRV_ADDR)); this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncArrayDispatcher-Thread-" + dispatcherId); this.worker.setDaemon(true); this.worker.start(); @@ -247,16 +256,14 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { transBeanList.add(traceData); } for (Map.Entry> entry : transBeanMap.entrySet()) { - //key -> dataTopic(Not trace Topic) - String dataTopic = entry.getKey(); - flushData(entry.getValue(), dataTopic); + flushData(entry.getValue()); } } /** * batch sending data actually */ - private void flushData(List transBeanList, String topic) { + private void flushData(List transBeanList) { if (transBeanList.size() == 0) { return; } @@ -292,7 +299,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher { * @param data the message track trace data in this batch */ private void sendTraceDataByMQ(Set keySet, final String data) { - String topic = TrackTraceConstants.TRACE_TOPIC; + String topic = traceTopicName; final Message message = new Message(topic, data.getBytes()); //keyset of message track trace includes msgId of or original message diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java index 0c38e220..c174f462 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/hook/SendMessageTrackHookImpl.java @@ -20,11 +20,10 @@ import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.trace.core.common.TrackTraceBean; -import org.apache.rocketmq.client.trace.core.common.TrackTraceConstants; import org.apache.rocketmq.client.trace.core.common.TrackTraceContext; import org.apache.rocketmq.client.trace.core.common.TrackTraceType; import org.apache.rocketmq.client.trace.core.dispatch.AsyncDispatcher; -import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; import java.util.ArrayList; public class SendMessageTrackHookImpl implements SendMessageHook { @@ -43,7 +42,7 @@ public class SendMessageTrackHookImpl implements SendMessageHook { @Override public void sendMessageBefore(SendMessageContext context) { //if it is message track trace data,then it doesn't recorded - if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX)) { + if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName())) { return; } //build the context content of TuxeTraceContext @@ -67,7 +66,7 @@ public class SendMessageTrackHookImpl implements SendMessageHook { @Override public void sendMessageAfter(SendMessageContext context) { //if it is message track trace data,then it doesn't recorded - if (context == null || context.getMessage().getTopic().startsWith(TrackTraceConstants.TRACE_TOPIC) + if (context == null || context.getMessage().getTopic().startsWith(((AsyncArrayDispatcher) localDispatcher).getTraceTopicName()) || context.getMqTraceContext() == null) { return; } diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index 27c10dad..08382dfe 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -100,21 +100,23 @@ public class DefaultMQConsumerWithTraceTest { private RebalancePushImpl rebalancePushImpl; private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer normalPushConsumer; + private DefaultMQPushConsumer customTraceTopicpushConsumer; + private AsyncArrayDispatcher asyncArrayDispatcher; private MQClientInstance mQClientTraceFactory; @Mock private MQClientAPIImpl mQClientTraceAPIImpl; private DefaultMQProducer traceProducer; - + private String customerTraceTopic = "rmq_track_trace_topic_12345"; @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pushConsumer = new DefaultMQPushConsumer(consumerGroup,true); + pushConsumer = new DefaultMQPushConsumer(consumerGroup,true,""); consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis(); - normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false); - + normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false,""); + customTraceTopicpushConsumer = new DefaultMQPushConsumer(consumerGroup,true,customerTraceTopic); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index c3757a8a..3b482884 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -76,6 +76,7 @@ public class DefaultMQProducerWithTraceTest { private AsyncArrayDispatcher asyncArrayDispatcher; private DefaultMQProducer producer; + private DefaultMQProducer customTraceTopicproducer; private DefaultMQProducer traceProducer; private DefaultMQProducer normalProducer; @@ -84,14 +85,17 @@ public class DefaultMQProducerWithTraceTest { private String producerGroupPrefix = "FooBar_PID"; private String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); - + private String customerTraceTopic = "rmq_track_trace_topic_12345"; + @Before public void init() throws Exception { - normalProducer = new DefaultMQProducer(producerGroupTemp,false); - producer = new DefaultMQProducer(producerGroupTemp,true); + customTraceTopicproducer = new DefaultMQProducer(producerGroupTemp,false, customerTraceTopic); + normalProducer = new DefaultMQProducer(producerGroupTemp,false,""); + producer = new DefaultMQProducer(producerGroupTemp,true,""); producer.setNamesrvAddr("127.0.0.1:9876"); normalProducer.setNamesrvAddr("127.0.0.1:9877"); + customTraceTopicproducer.setNamesrvAddr("127.0.0.1:9878"); message = new Message(topic, new byte[] {'a', 'b' ,'c'}); asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher(); traceProducer = asyncArrayDispatcher.getTraceProducer(); diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 77d492ec..11c1fcb9 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -53,6 +53,8 @@ public class BrokerConfig { private String messageStorePlugIn = ""; @ImportantField private boolean autoTraceBrokerEnable = false; + @ImportantField + private String msgTrackTopicName = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; /** * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default * value is 1. @@ -741,4 +743,12 @@ public class BrokerConfig { public void setAutoTraceBrokerEnable(boolean autoTraceBrokerEnable) { this.autoTraceBrokerEnable = autoTraceBrokerEnable; } + + public String getMsgTrackTopicName() { + return msgTrackTopicName; + } + + public void setMsgTrackTopicName(String msgTrackTopicName) { + this.msgTrackTopicName = msgTrackTopicName; + } } diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java index fb8e37fd..5a513e41 100644 --- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class TraceProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true, ""); producer.start(); for (int i = 0; i < 128; i++) diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java index b9710d4a..e0e05a8c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java @@ -28,7 +28,8 @@ import org.apache.rocketmq.common.message.MessageExt; public class TracePushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true); + //here,we use the default message track trace topic name + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true, ""); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 diff --git a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java index 7ba580df..d2adac53 100644 --- a/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java +++ b/logappender/src/main/java/org/apache/rocketmq/logappender/common/ProducerInstance.java @@ -63,7 +63,7 @@ public class ProducerInstance { return p; } - DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group,false); + DefaultMQProducer defaultMQProducer = new DefaultMQProducer(group); defaultMQProducer.setNamesrvAddr(nameServerAddress); MQProducer beforeProducer = null; beforeProducer = getProducerInstance().producerMap.putIfAbsent(genKey, defaultMQProducer); diff --git a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java index 12893938..38904c0b 100644 --- a/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java +++ b/logappender/src/test/java/org/apache/rocketmq/logappender/AbstractTestCase.java @@ -39,7 +39,7 @@ public class AbstractTestCase { @Before public void mockLoggerAppender() throws Exception { - DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender",false)); + DefaultMQProducer defaultMQProducer = spy(new DefaultMQProducer("loggerAppender")); doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocationOnMock) throws Throwable { diff --git a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java index a828e8dd..ce739be5 100644 --- a/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java +++ b/test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQNormalConsumer.java @@ -46,7 +46,7 @@ public class RMQNormalConsumer extends AbstractMQConsumer { } public void create(boolean useTLS) { - consumer = new DefaultMQPushConsumer(consumerGroup,false); + consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setInstanceName(RandomUtil.getStringByUUID()); consumer.setNamesrvAddr(nsAddr); try { diff --git a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java index abd1e3b6..66767cc9 100644 --- a/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java +++ b/test/src/main/java/org/apache/rocketmq/test/factory/ProducerFactory.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.test.util.RandomUtil; public class ProducerFactory { public static DefaultMQProducer getRMQProducer(String ns) { - DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID(),false); + DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); producer.setNamesrvAddr(ns); try { producer.start(); diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java index 675fc2a8..6bb8caad 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/message/QueryMsgByIdSubCommand.java @@ -213,7 +213,7 @@ public class QueryMsgByIdSubCommand implements SubCommand { public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); - DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById",false); + DefaultMQProducer defaultMQProducer = new DefaultMQProducer("ReSendMsgById"); defaultMQProducer.setInstanceName(Long.toString(System.currentTimeMillis())); try { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java index 3debb3d1..9bf09ad4 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/monitor/MonitorService.java @@ -65,7 +65,7 @@ public class MonitorService { private final DefaultMQPullConsumer defaultMQPullConsumer = new DefaultMQPullConsumer( MixAll.TOOLS_CONSUMER_GROUP); private final DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer( - MixAll.MONITOR_CONSUMER_GROUP,false); + MixAll.MONITOR_CONSUMER_GROUP); public MonitorService(MonitorConfig monitorConfig, MonitorListener monitorListener, RPCHook rpcHook) { this.monitorConfig = monitorConfig; -- GitLab