diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java index 4ce996b2cfc494d5be03d7bb51312a70b358bd4e..163897b1ccdc06e174c48f729b08b1be79ddcec0 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java @@ -125,7 +125,6 @@ public class TopicConfigManager extends ConfigManager { this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } { - // MixAll.RMQ_SYS_TRACK_TRACE_TOPIC if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { String topic = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); @@ -165,6 +164,10 @@ public class TopicConfigManager extends ConfigManager { if (topicConfig != null) return topicConfig; + if (this.brokerController.getBrokerConfig().isAutoTraceBrokerEnable()) { + return topicConfig; + } + TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic); if (defaultTopicConfig != null) { if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) { diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java index 5ee2e33ffbeb5cb35908a1bbc5e25f47f5e3f0c8..0b5ce052176a57c8acb3bb2f430bba262f6d2e92 100644 --- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java +++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java @@ -265,7 +265,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * Default constructor. */ public DefaultMQPushConsumer() { - this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely(), false); + this(MixAll.DEFAULT_CONSUMER_GROUP, null, new AllocateMessageQueueAveragely()); } /** @@ -275,12 +275,26 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param rpcHook RPC hook to execute before each remoting command. * @param allocateMessageQueueStrategy message queue allocating algorithm. */ + public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, + AllocateMessageQueueStrategy allocateMessageQueueStrategy) { + this.consumerGroup = consumerGroup; + this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; + defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); + } + + /** + * Constructor specifying consumer group, RPC hook and message queue allocating algorithm. + * + * @param consumerGroup Consume queue. + * @param rpcHook RPC hook to execute before each remoting command. + * @param allocateMessageQueueStrategy message queue allocating algorithm. + * @param msgTraceSwitch switch flag instance for message track trace. + */ public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean msgTraceSwitch) { this.consumerGroup = consumerGroup; this.allocateMessageQueueStrategy = allocateMessageQueueStrategy; defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook); - //if client open the message track trace feature if (msgTraceSwitch) { try { Properties tempProperties = new Properties(); @@ -307,16 +321,26 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume * @param rpcHook RPC hook to execute before each remoting command. */ public DefaultMQPushConsumer(RPCHook rpcHook) { - this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely(),false); + this(MixAll.DEFAULT_CONSUMER_GROUP, rpcHook, new AllocateMessageQueueAveragely()); } /** * Constructor specifying consumer group. * * @param consumerGroup Consumer group. + * @param msgTraceSwitch switch flag instance for message track trace. */ public DefaultMQPushConsumer(final String consumerGroup, boolean msgTraceSwitch) { - this(consumerGroup, null, new AllocateMessageQueueAveragely(),msgTraceSwitch); + this(consumerGroup, null, new AllocateMessageQueueAveragely(), msgTraceSwitch); + } + + /** + * Constructor specifying consumer group. + * + * @param consumerGroup Consumer group. + */ + public DefaultMQPushConsumer(final String consumerGroup) { + this(consumerGroup, null, new AllocateMessageQueueAveragely()); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index c35d9468def2c3bbe20639dcfcf815329418d930..9ffaed0a4f9086a87d88603f00f0335330af560d 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -144,7 +144,7 @@ public class MQClientInstance { this.rebalanceService = new RebalanceService(this); - this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP,false); + this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java index a2bacfb9071880dd2b4eafe2dc0f6783b54a9be6..a654d7631f7ff8dc83f106faee61734f412ba72a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java @@ -146,7 +146,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. * @param rpcHook RPC hook to execute per each remoting command execution. - * @param msgTraceSwitch switch flag instance for message track trace + */ + public DefaultMQProducer(final String producerGroup, RPCHook rpcHook) { + this.producerGroup = producerGroup; + defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook); + } + + /** + * Constructor specifying both producer group and RPC hook. + * + * @param producerGroup Producer group, see the name-sake field. + * @param rpcHook RPC hook to execute per each remoting command execution. + * @param msgTraceSwitch switch flag instance for message track trace. */ public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean msgTraceSwitch) { this.producerGroup = producerGroup; @@ -177,8 +188,18 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * * @param producerGroup Producer group, see the name-sake field. */ + public DefaultMQProducer(final String producerGroup) { + this(producerGroup, null); + } + + /** + * Constructor specifying producer group. + * + * @param producerGroup Producer group, see the name-sake field. + * @param msgTraceSwitch switch flag instance for message track trace. + */ public DefaultMQProducer(final String producerGroup, boolean msgTraceSwitch) { - this(producerGroup, null,msgTraceSwitch); + this(producerGroup, null, msgTraceSwitch); } /** @@ -187,7 +208,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer { * @param rpcHook RPC hook to execute per each remoting command execution. */ public DefaultMQProducer(RPCHook rpcHook) { - this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook,false); + this(MixAll.DEFAULT_PRODUCER_GROUP, rpcHook); } /** diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java index f6030c9d26bf19fafdd8997fa981d898a87d2124..8f6428b29c2ae700547d0b082a8d13d608e35bd1 100644 --- a/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java +++ b/client/src/main/java/org/apache/rocketmq/client/producer/TransactionMQProducer.java @@ -35,11 +35,11 @@ public class TransactionMQProducer extends DefaultMQProducer { } public TransactionMQProducer(final String producerGroup) { - super(producerGroup,false); + super(producerGroup); } public TransactionMQProducer(final String producerGroup, RPCHook rpcHook) { - super(producerGroup, rpcHook,false); + super(producerGroup, rpcHook); } @Override diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java deleted file mode 100644 index 7d1d75b0dd57035685432a7411f5fb1850deb065..0000000000000000000000000000000000000000 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/Utils/TrackTraceUtils.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.client.trace.core.Utils; - -import org.apache.rocketmq.remoting.protocol.RemotingSerializable; - -import java.net.Inet6Address; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Enumeration; - -/** - * Track Trace Util - */ -public class TrackTraceUtils { - - public static String getLocalAddress() { - try { - //Traverse the network bind card to find a valid IP address and return it. - Enumeration enumeration = NetworkInterface.getNetworkInterfaces(); - ArrayList ipv4Result = new ArrayList(); - ArrayList ipv6Result = new ArrayList(); - while (enumeration.hasMoreElements()) { - final NetworkInterface networkInterface = enumeration.nextElement(); - final Enumeration en = networkInterface.getInetAddresses(); - while (en.hasMoreElements()) { - final InetAddress address = en.nextElement(); - if (!address.isLoopbackAddress()) { - if (address instanceof Inet6Address) { - ipv6Result.add(normalizeHostAddress(address)); - } else { - ipv4Result.add(normalizeHostAddress(address)); - } - } - } - } - - // get priority to IPv4 - if (!ipv4Result.isEmpty()) { - for (String ip : ipv4Result) { - if (ip.startsWith("127.0") || ip.startsWith("192.168")) { - continue; - } - - return ip; - } - - //get the last one - return ipv4Result.get(ipv4Result.size() - 1); - } - //then use the ipv6 address - else if (!ipv6Result.isEmpty()) { - return ipv6Result.get(0); - } - //the use local ip address - final InetAddress localHost = InetAddress.getLocalHost(); - return normalizeHostAddress(localHost); - } catch (SocketException e) { - e.printStackTrace(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } finally { - - } - - return null; - } - - public static String normalizeHostAddress(final InetAddress localHost) { - if (localHost instanceof Inet6Address) { - return "[" + localHost.getHostAddress() + "]"; - } else { - return localHost.getHostAddress(); - } - } - - public static String toJson(final Object obj, boolean prettyFormat) { - return RemotingSerializable.toJson(obj, prettyFormat); - } - - public static T fromJson(String json, Class classOfT) { - return RemotingSerializable.fromJson(json, classOfT); - } - - public static String replaceNull(String ori) { - return ori == null ? "" : ori; - } -} diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java index 4981759eba76a81705cea594845dcf50ca23320e..220a128115c5ee66c11a6b63c6767e55c08f73ff 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceBean.java @@ -16,11 +16,11 @@ */ package org.apache.rocketmq.client.trace.core.common; -import org.apache.rocketmq.client.trace.core.Utils.TrackTraceUtils; +import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageType; public class TrackTraceBean { - private static final String LOCAL_ADDRESS = TrackTraceUtils.getLocalAddress(); + private static final String LOCAL_ADDRESS = UtilAll.ipToIPv4Str(UtilAll.getIP()); private String topic = ""; private String msgId = ""; private String offsetMsgId = ""; diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java index 31a89ef655f20afec1e0c05a0ec4df567f04cac4..72fa630554ee702c8bb6fb9e657c2230c870c535 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/core/common/TrackTraceType.java @@ -16,9 +16,6 @@ */ package org.apache.rocketmq.client.trace.core.common; -/** - * Created by zongtanghu on 2018/11/6. - */ public enum TrackTraceType { Pub, SubBefore, diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 019b44da4fe68f40a9f5f3ac8676121c33e2ab03..ff2fb78bbfb9df89596abc2ad7b35fcc33a2df3e 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -89,7 +89,7 @@ public class DefaultMQPushConsumerTest { @Before public void init() throws Exception { consumerGroup = "FooBarGroup" + System.currentTimeMillis(); - pushConsumer = new DefaultMQPushConsumer(consumerGroup,false); + pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setPullInterval(60 * 1000); @@ -252,7 +252,7 @@ public class DefaultMQPushConsumerTest { } private DefaultMQPushConsumer createPushConsumer() { - DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup,false); + DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java index e6f26f20e8fe56e1da7f968ce4ce78555e246d64..d4f581231f0d4d25eb7a403f9200911ce90549da 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImplTest.java @@ -42,7 +42,7 @@ public class DefaultMQPushConsumerImplTest { //test message thrown.expectMessage("consumeThreadMin (10) is larger than consumeThreadMax (9)"); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_group"); consumer.setConsumeThreadMin(10); consumer.setConsumeThreadMax(9); diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java index f6ca3aca9f77d4b4b7b547b2a185ff3f5d95a5ed..796a3943087247c9e5fc75aa5b8fc1cc47c35fb0 100644 --- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java @@ -44,7 +44,7 @@ import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class RebalancePushImplTest { @Spy - private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest",false), null); + private DefaultMQPushConsumerImpl defaultMQPushConsumer = new DefaultMQPushConsumerImpl(new DefaultMQPushConsumer("RebalancePushImplTest"), null); @Mock private MQClientInstance mqClientInstance; @Mock diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 391743614f57e21b715e998d09ac7e241768f2a2..c225afd6842b24adc49818575d54a2d5ca5e016d 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -82,7 +82,7 @@ public class DefaultMQProducerTest { @Before public void init() throws Exception { String producerGroupTemp = producerGroupPrefix + System.currentTimeMillis(); - producer = new DefaultMQProducer(producerGroupTemp,false); + producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setCompressMsgBodyOverHowmuch(16); message = new Message(topic, new byte[] {'a'}); @@ -309,7 +309,7 @@ public class DefaultMQProducerTest { @Test public void testSetCallbackExecutor() throws MQClientException { String producerGroupTemp = "testSetCallbackExecutor_" + System.currentTimeMillis(); - producer = new DefaultMQProducer(producerGroupTemp,false); + producer = new DefaultMQProducer(producerGroupTemp); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java new file mode 100644 index 0000000000000000000000000000000000000000..27c10dad5aa6efa85e8e0f49b19e731811b1bcda --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.trace; + +import java.io.ByteArrayOutputStream; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.PullCallback; +import org.apache.rocketmq.client.consumer.PullResult; +import org.apache.rocketmq.client.consumer.PullStatus; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQBrokerException; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.impl.CommunicationMode; +import org.apache.rocketmq.client.impl.FindBrokerResult; +import org.apache.rocketmq.client.impl.MQClientAPIImpl; +import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.consumer.ProcessQueue; +import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; +import org.apache.rocketmq.client.impl.consumer.PullMessageService; +import org.apache.rocketmq.client.impl.consumer.PullRequest; +import org.apache.rocketmq.client.impl.consumer.PullResultExt; +import org.apache.rocketmq.client.impl.consumer.RebalancePushImpl; +import org.apache.rocketmq.client.impl.factory.MQClientInstance; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.client.producer.SendStatus; +import org.apache.rocketmq.client.trace.core.dispatch.impl.AsyncArrayDispatcher; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.message.MessageClientExt; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader; + +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; +import org.apache.rocketmq.common.protocol.route.TopicRouteData; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.junit.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class DefaultMQConsumerWithTraceTest { + private String consumerGroup; + private String consumerGroupNormal; + private String producerGroupTraceTemp = MixAll.RMQ_SYS_TRACK_TRACE_TOPIC + System.currentTimeMillis(); + + private String topic = "FooBar"; + private String brokerName = "BrokerA"; + private MQClientInstance mQClientFactory; + + @Mock + private MQClientAPIImpl mQClientAPIImpl; + private PullAPIWrapper pullAPIWrapper; + private RebalancePushImpl rebalancePushImpl; + private DefaultMQPushConsumer pushConsumer; + private DefaultMQPushConsumer normalPushConsumer; + + private AsyncArrayDispatcher asyncArrayDispatcher; + private MQClientInstance mQClientTraceFactory; + @Mock + private MQClientAPIImpl mQClientTraceAPIImpl; + private DefaultMQProducer traceProducer; + + + @Before + public void init() throws Exception { + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); + pushConsumer = new DefaultMQPushConsumer(consumerGroup,true); + consumerGroupNormal = "FooBarGroup" + System.currentTimeMillis(); + normalPushConsumer = new DefaultMQPushConsumer(consumerGroupNormal,false); + + pushConsumer.setNamesrvAddr("127.0.0.1:9876"); + pushConsumer.setPullInterval(60 * 1000); + + asyncArrayDispatcher = (AsyncArrayDispatcher)pushConsumer.getTraceDispatcher(); + traceProducer = asyncArrayDispatcher.getTraceProducer(); + + + pushConsumer.registerMessageListener(new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + return null; + } + }); + + DefaultMQPushConsumerImpl pushConsumerImpl = pushConsumer.getDefaultMQPushConsumerImpl(); + rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); + Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); + field.setAccessible(true); + field.set(pushConsumerImpl, rebalancePushImpl); + pushConsumer.subscribe(topic, "*"); + pushConsumer.start(); + + mQClientFactory = spy(pushConsumerImpl.getmQClientFactory()); + mQClientTraceFactory = spy(pushConsumerImpl.getmQClientFactory()); + + field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); + field.setAccessible(true); + field.set(pushConsumerImpl, mQClientFactory); + + field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + field.setAccessible(true); + field.set(mQClientFactory, mQClientAPIImpl); + + Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); + fieldTrace.setAccessible(true); + fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); + + fieldTrace = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); + fieldTrace.setAccessible(true); + fieldTrace.set(mQClientTraceFactory, mQClientTraceAPIImpl); + + pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); + field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); + field.setAccessible(true); + field.set(pushConsumerImpl, pullAPIWrapper); + + pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); + mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); + + when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); + return pullResult; + } + }); + + doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + Set messageQueueSet = new HashSet(); + messageQueueSet.add(createPullRequest().getMessageQueue()); + pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); + } + + @After + public void terminate() { + pushConsumer.shutdown(); + } + + @Test + public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { + traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); + //when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute()); + //when(mQClientTraceAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTraceTopicRoute()); + + + final CountDownLatch countDownLatch = new CountDownLatch(1); + final MessageExt[] messageExts = new MessageExt[1]; + pushConsumer.getDefaultMQPushConsumerImpl().setConsumeMessageService(new ConsumeMessageConcurrentlyService(pushConsumer.getDefaultMQPushConsumerImpl(), new MessageListenerConcurrently() { + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, + ConsumeConcurrentlyContext context) { + messageExts[0] = msgs.get(0); + countDownLatch.countDown(); + return null; + } + })); + + PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); + pullMessageService.executePullRequestImmediately(createPullRequest()); + countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + assertThat(messageExts[0].getTopic()).isEqualTo(topic); + assertThat(messageExts[0].getBody()).isEqualTo(new byte[] {'a'}); + } + + private PullRequest createPullRequest() { + PullRequest pullRequest = new PullRequest(); + pullRequest.setConsumerGroup(consumerGroup); + pullRequest.setNextOffset(1024); + + MessageQueue messageQueue = new MessageQueue(); + messageQueue.setBrokerName(brokerName); + messageQueue.setQueueId(0); + messageQueue.setTopic(topic); + pullRequest.setMessageQueue(messageQueue); + ProcessQueue processQueue = new ProcessQueue(); + processQueue.setLocked(true); + processQueue.setLastLockTimestamp(System.currentTimeMillis()); + pullRequest.setProcessQueue(processQueue); + + return pullRequest; + } + + private PullResultExt createPullResult(PullMessageRequestHeader requestHeader, PullStatus pullStatus, + List messageExtList) throws Exception { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + for (MessageExt messageExt : messageExtList) { + outputStream.write(MessageDecoder.encode(messageExt, false)); + } + return new PullResultExt(pullStatus, requestHeader.getQueueOffset() + messageExtList.size(), 123, 2048, messageExtList, 0, outputStream.toByteArray()); + } + + public static TopicRouteData createTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("BrokerA"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10911"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("BrokerA"); + queueData.setPerm(6); + queueData.setReadQueueNums(3); + queueData.setWriteQueueNums(4); + queueData.setTopicSynFlag(0); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } + + private SendResult createSendResult(SendStatus sendStatus) { + SendResult sendResult = new SendResult(); + sendResult.setMsgId("123"); + sendResult.setOffsetMsgId("123"); + sendResult.setQueueOffset(456); + sendResult.setSendStatus(sendStatus); + sendResult.setRegionId("HZ"); + return sendResult; + } + + public static TopicRouteData createTraceTopicRoute() { + TopicRouteData topicRouteData = new TopicRouteData(); + + topicRouteData.setFilterServerTable(new HashMap>()); + List brokerDataList = new ArrayList(); + BrokerData brokerData = new BrokerData(); + brokerData.setBrokerName("broker-trace"); + brokerData.setCluster("DefaultCluster"); + HashMap brokerAddrs = new HashMap(); + brokerAddrs.put(0L, "127.0.0.1:10912"); + brokerData.setBrokerAddrs(brokerAddrs); + brokerDataList.add(brokerData); + topicRouteData.setBrokerDatas(brokerDataList); + + List queueDataList = new ArrayList(); + QueueData queueData = new QueueData(); + queueData.setBrokerName("broker-trace"); + queueData.setPerm(6); + queueData.setReadQueueNums(1); + queueData.setWriteQueueNums(1); + queueData.setTopicSynFlag(1); + queueDataList.add(queueData); + topicRouteData.setQueueDatas(queueDataList); + return topicRouteData; + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 7d8fb0a6ae6c860d7657b14040b153b96a0f64dd..c3757a8afad91fc0c4653bf4bb5b534d9a027262 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -22,10 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import net.bytebuddy.asm.Advice; import org.apache.rocketmq.client.ClientConfig; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; @@ -48,7 +45,6 @@ import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; -import org.apache.rocketmq.remoting.netty.NettyRemotingClient; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,11 +53,9 @@ import org.mockito.Mock; import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; -import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.when; @@ -83,6 +77,7 @@ public class DefaultMQProducerWithTraceTest { private DefaultMQProducer producer; private DefaultMQProducer traceProducer; + private DefaultMQProducer normalProducer; private Message message; private String topic = "FooBar"; @@ -93,21 +88,23 @@ public class DefaultMQProducerWithTraceTest { @Before public void init() throws Exception { + normalProducer = new DefaultMQProducer(producerGroupTemp,false); producer = new DefaultMQProducer(producerGroupTemp,true); producer.setNamesrvAddr("127.0.0.1:9876"); + normalProducer.setNamesrvAddr("127.0.0.1:9877"); message = new Message(topic, new byte[] {'a', 'b' ,'c'}); asyncArrayDispatcher = (AsyncArrayDispatcher)producer.getTraceDispatcher(); traceProducer = asyncArrayDispatcher.getTraceProducer(); producer.start(); - + Field field = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); field.setAccessible(true); field.set(producer.getDefaultMQProducerImpl(), mQClientFactory); Field fieldTrace = DefaultMQProducerImpl.class.getDeclaredField("mQClientFactory"); fieldTrace.setAccessible(true); - field.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); + fieldTrace.set(traceProducer.getDefaultMQProducerImpl(), mQClientTraceFactory); field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); field.setAccessible(true); @@ -220,15 +217,4 @@ public class DefaultMQProducerWithTraceTest { topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } - - private SendResult createSendTraceResult(SendStatus sendStatus) { - SendResult sendResult = new SendResult(); - sendResult.setMsgId("456"); - sendResult.setOffsetMsgId("456"); - sendResult.setQueueOffset(789); - sendResult.setSendStatus(sendStatus); - sendResult.setRegionId("HZ"); - return sendResult; - } - } diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java index e1372ccd7e31068df217cd8849230a8f98ca8528..cf566aa15bd0d58f629ff879a96921acd71c9ad0 100644 --- a/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/batch/SimpleBatchProducer.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.common.message.Message; public class SimpleBatchProducer { public static void main(String[] args) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName",false); + DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //If you just send messages of no more than 1MiB at a time, it is easy to use batch diff --git a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java index eae0800812de037f8ebce0fa9fe00dd85e23d87a..f9495c4171481bc324f10bd36f00cb9b756de4c4 100644 --- a/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/batch/SplitBatchProducer.java @@ -28,7 +28,7 @@ public class SplitBatchProducer { public static void main(String[] args) throws Exception { - DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName",false); + DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName"); producer.start(); //large batch diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index 4b6edf0537b11bcb7fd302a7c1be55ccedc60e2c..d431d3ecc6ad6c05dae4ebe8b7796d930c4905fb 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -102,7 +102,7 @@ public class Consumer { } }, 10000, 10000); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); if (filterType == null || expression == null) { diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index 16ad133f4980a29c328efbdb5b05dd667e1ea31c..ce2b83f97755c51a3fabda1f22ef13d276245929 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -100,7 +100,7 @@ public class Producer { } }, 10000, 10000); - final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer",false); + final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer"); producer.setInstanceName(Long.toString(System.currentTimeMillis())); if (commandLine.hasOption('n')) { diff --git a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java index f1a55c6d61cda836fa2d04c2a99228922e61d9fe..fb1f9bbde7110fe0b9a500e771cf70a8fe4f0c95 100644 --- a/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/broadcast/PushConsumer.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_1"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java index 262f57a9d25d55a90b230b6eb641dc0e2655d2e3..bb491ac40af5a55b9667a4d4fb72c023ce91156c 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java @@ -30,7 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException, IOException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4"); ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile()); diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java index 2d575f99fdd244f3016c7fd3a4e0fbc007c1bf62..2a0da6546c853ee569e786062923f646b0de59d8 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/Producer.java @@ -24,7 +24,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",false); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); try { diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java index 43409c67def62b1368560925552d03e2022ddf67..c41c9c14c3c15b5a6e67f0c689cbc2bf1206c80a 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java @@ -30,7 +30,7 @@ import java.util.List; public class SqlConsumer { public static void main(String[] args) { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); try { consumer.subscribe("TopicTest", MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" + diff --git a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java index 4f319f5c3e6acc9f6d0d8c47ee12d3f384961819..3f3a0e65215b0d1f53720c26a25f9f53bdeac68b 100644 --- a/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class SqlProducer { public static void main(String[] args) { - DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); try { producer.start(); } catch (MQClientException e) { diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java index c536a14c40b99650174af1c3fd10160527677a03..6936f1dcea374ad281e5ae7e34f0e0120d055399 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Consumer.java @@ -41,7 +41,7 @@ public class Consumer { String subscription = commandLine.getOptionValue('s'); final String returnFailedHalf = commandLine.getOptionValue('f'); - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group,false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); consumer.setInstanceName(Long.toString(System.currentTimeMillis())); consumer.subscribe(topic, subscription); diff --git a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java index 20fed39b9f952bbbd927d453391c42410d91ccfd..1d4336d7fa9b5cdeb14e271dffac092ff53ebe68 100644 --- a/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/operation/Producer.java @@ -39,7 +39,7 @@ public class Producer { String keys = commandLine.getOptionValue('k'); String msgCount = commandLine.getOptionValue('c'); - DefaultMQProducer producer = new DefaultMQProducer(group,false); + DefaultMQProducer producer = new DefaultMQProducer(group); producer.setInstanceName(Long.toString(System.currentTimeMillis())); producer.start(); diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java index d7b6cd9591f7d2fd53d08a64aadd25b820f229b0..abb274d6129278cb6e045ef503e7cdc88a79f912 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Consumer.java @@ -29,7 +29,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class Consumer { public static void main(String[] args) throws MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); diff --git a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java index c1e26910a214923655ba81d9ce01ce0dfef2c7ca..6a6bdc7d710e750f5738ff9a705e3680967e4e24 100644 --- a/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/ordermessage/Producer.java @@ -32,7 +32,7 @@ import org.apache.rocketmq.remoting.exception.RemotingException; public class Producer { public static void main(String[] args) throws UnsupportedEncodingException { try { - MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); + MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index da70a78ae841fc07b67143bcc772c705bd257032..6d3b936507e9c667fdc816207f81d0f9502ecae2 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -35,7 +35,7 @@ public class Consumer { /* * Instantiate with specified consumer group name. */ - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4",false); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); /* * Specify name server addresses. diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 3d2d1c6e9ba9312ce15331ff73c35fc7551c06d6..53a1d4dd64ac027d52389520c05b7cad08228120 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -31,7 +31,7 @@ public class Producer { /* * Instantiate with a producer group name. */ - DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name",false); + DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); /* * Specify name server addresses. diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java index 14ed57ff9330d0c97c2abac440522e38f78fc536..d40739c813d4ad2a7925a76d2f3b6164e025fb8e 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/AsyncProducer.java @@ -30,7 +30,7 @@ public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { - DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test",false); + DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java index 5601e667757873b2cd701efc61492da61fbdb122..448f8ee9f45a93350712bb4e1ef03d915a33a481 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/Producer.java @@ -25,7 +25,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 128; i++) diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java index 16271bb4bc765379931a6d90e837e0e19a66cab1..abbfbdffcdda01b60ff3dc3dd1aee3b628849510 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java @@ -28,7 +28,7 @@ import org.apache.rocketmq.common.message.MessageExt; public class PushConsumer { public static void main(String[] args) throws InterruptedException, MQClientException { - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1"); consumer.subscribe("TopicTest", "*"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //wrong time format 2017_0422_221800 diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java index f16953e34cb0c12cb18aeae5ccb94bd9c83a42d6..576f9cb6fbf23728ae4040cb5c8ea6c22fef2cee 100644 --- a/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java +++ b/example/src/main/java/org/apache/rocketmq/example/simple/TestProducer.java @@ -26,7 +26,7 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; public class TestProducer { public static void main(String[] args) throws MQClientException, InterruptedException { - DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",false); + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 1; i++) diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..fb8e37fd2b7cea6e121341883f0caa45852ce509 --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TraceProducer.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.tracemessage; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.client.producer.SendResult; +import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.remoting.common.RemotingHelper; + +public class TraceProducer { + public static void main(String[] args) throws MQClientException, InterruptedException { + + DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName",true); + producer.start(); + + for (int i = 0; i < 128; i++) + try { + { + Message msg = new Message("TopicTest", + "TagA", + "OrderID188", + "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); + SendResult sendResult = producer.send(msg); + System.out.printf("%s%n", sendResult); + } + + } catch (Exception e) { + e.printStackTrace(); + } + + producer.shutdown(); + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..b9710d4a7346eaea8c5b0930884e66714ef70b8b --- /dev/null +++ b/example/src/main/java/org/apache/rocketmq/example/tracemessage/TracePushConsumer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.example.tracemessage; + +import java.util.List; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; +import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; +import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; +import org.apache.rocketmq.common.message.MessageExt; + +public class TracePushConsumer { + public static void main(String[] args) throws InterruptedException, MQClientException { + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1",true); + consumer.subscribe("TopicTest", "*"); + consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + //wrong time format 2017_0422_221800 + consumer.setConsumeTimestamp("20181109221800"); + consumer.registerMessageListener(new MessageListenerConcurrently() { + + @Override + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + } + }); + consumer.start(); + System.out.printf("Consumer Started.%n"); + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java index 4296a53ad059891d53f75e4f6ff4d36dd88b65c4..69d3b24ca737b5b0c687281614addb63f9127554 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/broker/SendMsgStatusCommand.java @@ -71,7 +71,7 @@ public class SendMsgStatusCommand implements SubCommand { @Override public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException { - final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook,false); + final DefaultMQProducer producer = new DefaultMQProducer("PID_SMSC", rpcHook); producer.setInstanceName("PID_SMSC_" + System.currentTimeMillis()); try {