diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java index b2b06452f62e136d29e26f00baa5febac44db115..2625d353f84447109055d63c034707ff89da6535 100644 --- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java +++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDataEncoder.java @@ -198,6 +198,7 @@ public class TraceDataEncoder { } } + break; case EndTransaction: { TraceBean bean = ctx.getTraceBeans().get(0); sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java index 48d9b3a91a72af27ae1724b6bd88750aa72f332d..c2c60824ec80e9f4f3e2eac99c95704739349ecd 100644 --- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java @@ -17,7 +17,6 @@ package org.apache.rocketmq.client.consumer; import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; @@ -45,7 +44,6 @@ import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServic import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.ProcessQueue; -import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper; import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullResultExt; @@ -88,7 +86,6 @@ public class DefaultMQPushConsumerTest { @Mock private MQClientAPIImpl mQClientAPIImpl; - private PullAPIWrapper pullAPIWrapper; private RebalanceImpl rebalanceImpl; private DefaultMQPushConsumer pushConsumer; @@ -98,6 +95,27 @@ public class DefaultMQPushConsumerTest { factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.clear(); + when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public PullResult answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[] {'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); + return pullResult; + } + }); + + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.setNamesrvAddr("127.0.0.1:9876"); @@ -115,58 +133,24 @@ public class DefaultMQPushConsumerTest { // suppress updateTopicRouteInfoFromNameServer pushConsumer.changeInstanceNameToPID(); - mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); + mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)); + FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true); + mQClientFactory = spy(mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); - rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl()); - Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); - field.setAccessible(true); - field.set(pushConsumerImpl, rebalanceImpl); - - pushConsumer.subscribe(topic, "*"); - pushConsumer.start(); - - field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(pushConsumerImpl, mQClientFactory); - - field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); - field.setAccessible(true); - field.set(mQClientFactory, mQClientAPIImpl); - - pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); - field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); - field.setAccessible(true); - field.set(pushConsumerImpl, pullAPIWrapper); - - mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); + doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); - when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), - anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { - @Override - public PullResult answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - MessageClientExt messageClientExt = new MessageClientExt(); - messageClientExt.setTopic(topic); - messageClientExt.setQueueId(0); - messageClientExt.setMsgId("123"); - messageClientExt.setBody(new byte[] {'a'}); - messageClientExt.setOffsetMsgId("234"); - messageClientExt.setBornHost(new InetSocketAddress(8080)); - messageClientExt.setStoreHost(new InetSocketAddress(8080)); - PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); - ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); - return pullResult; - } - }); + rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl()); + doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); + FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true); - doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); - pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); - doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class)); + pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + + pushConsumer.subscribe(topic, "*"); + pushConsumer.start(); } @After @@ -292,7 +276,7 @@ public class DefaultMQPushConsumerTest { PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); pullMessageService.executePullRequestImmediately(createPullRequest()); - assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue(); + assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue(); pushConsumer.shutdown(); assertThat(messageConsumedFlag.get()).isTrue(); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java index c173b8ef790b1711e906fad8a2c8f3ff26e7b70f..ecf72ae44cfefdb682346d366b557b0a322f4077 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithOpenTracingTest.java @@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan; import io.opentracing.mock.MockTracer; import io.opentracing.tag.Tags; import java.io.ByteArrayOutputStream; -import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashSet; @@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.waitAtMost; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest { factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.clear(); + when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), + anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) + .thenAnswer(new Answer() { + @Override + public PullResult answer(InvocationOnMock mock) throws Throwable { + PullMessageRequestHeader requestHeader = mock.getArgument(1); + MessageClientExt messageClientExt = new MessageClientExt(); + messageClientExt.setTopic(topic); + messageClientExt.setQueueId(0); + messageClientExt.setMsgId("123"); + messageClientExt.setBody(new byte[]{'a'}); + messageClientExt.setOffsetMsgId("234"); + messageClientExt.setBornHost(new InetSocketAddress(8080)); + messageClientExt.setStoreHost(new InetSocketAddress(8080)); + PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); + ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); + return pullResult; + } + }); + + consumerGroup = "FooBarGroup" + System.currentTimeMillis(); pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( @@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest { // suppress updateTopicRouteInfoFromNameServer pushConsumer.changeInstanceNameToPID(); - mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); + mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true)); + FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true); + mQClientFactory = spy(mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); - rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl())); - Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl"); - field.setAccessible(true); - field.set(pushConsumerImpl, rebalancePushImpl); - pushConsumer.subscribe(topic, "*"); - - pushConsumer.start(); - - field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory"); - field.setAccessible(true); - field.set(pushConsumerImpl, mQClientFactory); - - field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl"); - field.setAccessible(true); - field.set(mQClientFactory, mQClientAPIImpl); - - pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false)); - field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper"); - field.setAccessible(true); - field.set(pushConsumerImpl, pullAPIWrapper); - - pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory); - mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl); - - when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class), - anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) - .thenAnswer(new Answer() { - @Override - public PullResult answer(InvocationOnMock mock) throws Throwable { - PullMessageRequestHeader requestHeader = mock.getArgument(1); - MessageClientExt messageClientExt = new MessageClientExt(); - messageClientExt.setTopic(topic); - messageClientExt.setQueueId(0); - messageClientExt.setMsgId("123"); - messageClientExt.setBody(new byte[]{'a'}); - messageClientExt.setOffsetMsgId("234"); - messageClientExt.setBornHost(new InetSocketAddress(8080)); - messageClientExt.setStoreHost(new InetSocketAddress(8080)); - PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.singletonList(messageClientExt)); - ((PullCallback) mock.getArgument(4)).onSuccess(pullResult); - return pullResult; - } - }); - doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); - pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); + pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet); + + pushConsumer.subscribe(topic, "*"); + pushConsumer.start(); } @After @@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest { assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getBody()).isEqualTo(new byte[]{'a'}); - assertThat(tracer.finishedSpans().size()).isEqualTo(1); + // wait until consumeMessageAfter hook of tracer is done surely. + waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1); MockSpan span = tracer.finishedSpans().get(0); assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER); diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index fd36fd1c312beb2b225f67ffd0b24a5b0c28fb3e..d924ce1d7d1f54e8c68ed63ddaa040ad5445f13d 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -446,7 +446,7 @@ public class ScheduledMessageConsumer { public ConsumeConcurrentlyStatus consumeMessage(List messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { // Print approximate delay time period - System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); + System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later"); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java index b6c6ae410f61afc4c9dd249ec5b56a0e74068c10..8a6429b2aa20ea00d5e7e51eb8824163e042d141 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java @@ -54,7 +54,7 @@ public class Consumer { final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20; final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; - final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; + final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "false"; final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0; @@ -190,7 +190,7 @@ public class Consumer { opt.setRequired(false); options.addOption(opt); - opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); + opt = new Option("p", "group suffix enable", true, "Consumer group suffix enable, Default: false"); opt.setRequired(false); options.addOption(opt); diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java index d1fa5743ae52164cc096122009f5ee26a8fe5f0e..6975ab58b0d63532ceac8bc8534908dd44af7478 100644 --- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Producer.java @@ -17,12 +17,14 @@ package org.apache.rocketmq.example.benchmark; import java.io.UnsupportedEncodingException; +import java.util.Arrays; import java.util.LinkedList; import java.util.Random; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.cli.CommandLine; @@ -58,9 +60,10 @@ public class Producer { final int tagCount = commandLine.hasOption('l') ? Integer.parseInt(commandLine.getOptionValue('l')) : 0; final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); + final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0; - System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", - topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); + System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n", + topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum); final InternalLogger log = ClientLogger.getLog(); @@ -72,6 +75,16 @@ public class Producer { final LinkedList snapshotList = new LinkedList(); + final long[] msgNums = new long[threadCount]; + + if (messageNum > 0) { + Arrays.fill(msgNums, messageNum / threadCount); + long mod = messageNum % threadCount; + if (mod > 0) { + msgNums[0] += mod; + } + } + timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { @@ -85,14 +98,7 @@ public class Producer { timer.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { - Long[] begin = snapshotList.getFirst(); - Long[] end = snapshotList.getLast(); - - final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); - final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); - - System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", - System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + doPrintStats(snapshotList, statsBenchmark, false); } } @@ -120,9 +126,14 @@ public class Producer { producer.start(); for (int i = 0; i < threadCount; i++) { + final long msgNumLimit = msgNums[i]; + if (messageNum > 0 && msgNumLimit == 0) { + break; + } sendThreadPool.execute(new Runnable() { @Override public void run() { + int num = 0; while (true) { try { final Message msg; @@ -198,10 +209,28 @@ public class Producer { } catch (InterruptedException ignored) { } } + if (messageNum > 0 && ++num >= msgNumLimit) { + break; + } } } }); } + try { + sendThreadPool.shutdown(); + sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); + timer.cancel(); + if (snapshotList.size() > 1) { + doPrintStats(snapshotList, statsBenchmark, true); + } else { + System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n", + statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), + statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get()); + } + producer.shutdown(); + } catch (InterruptedException e) { + log.error("[Exit] Thread Interrupted Exception", e); + } } public static Options buildCommandlineOptions(final Options options) { @@ -233,6 +262,10 @@ public class Producer { opt.setRequired(false); options.addOption(opt); + opt = new Option("q", "messageQuantity", true, "Send message quantity, Default: 0, running forever"); + opt.setRequired(false); + options.addOption(opt); + return options; } @@ -249,6 +282,23 @@ public class Producer { return msg; } + + private static void doPrintStats(final LinkedList snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) { + Long[] begin = snapshotList.getFirst(); + Long[] end = snapshotList.getLast(); + + final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L); + final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]); + + if (done) { + System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", + statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(), + sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } else { + System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n", + System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]); + } + } } class StatsBenchmarkProducer { diff --git a/pom.xml b/pom.xml index 3714eb344ee21b5c180be979e46b7c11fe83a49c..39f368538ea46d17911e3102750d65d28564d53b 100644 --- a/pom.xml +++ b/pom.xml @@ -439,6 +439,12 @@ 3.10.0 test + + org.awaitility + awaitility + 4.1.0 + test + @@ -536,7 +542,7 @@ com.alibaba fastjson - 1.2.69 + 1.2.76 org.javassist diff --git a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java index 82ab461aa53607c475f1aff7f6adda1118be728f..bdd991a335fcd170da3c604dfa1101c5c326ee86 100644 --- a/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java +++ b/test/src/main/java/org/apache/rocketmq/test/util/data/collect/impl/ListDataCollectorImpl.java @@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector { return datas; } - public void resetData() { + public synchronized void resetData() { datas.clear(); unlockIncrement(); } @@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector { return Collections.frequency(datas, data) == 1; } - public Collection getAllDataWithoutDuplicate() { + public synchronized Collection getAllDataWithoutDuplicate() { return new HashSet(datas); } @@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector { return res; } - public void removeData(Object data) { + public synchronized void removeData(Object data) { datas.remove(data); } diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java index c770db01d427c9e33c17a8a6cc20a76ed6760dfc..a78a4a63ab0eb6495561eaf734f06cffd8a627c7 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicRouteSubCommand.java @@ -19,14 +19,23 @@ package org.apache.rocketmq.tools.command.topic; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; +import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.QueueData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommandException; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class TopicRouteSubCommand implements SubCommand { + private static final String FORMAT = "%-45s %-32s %-50s %-10s %-11s %-5s%n"; + @Override public String commandName() { return "topicRoute"; @@ -43,6 +52,9 @@ public class TopicRouteSubCommand implements SubCommand { opt.setRequired(true); options.addOption(opt); + opt = new Option("l", "list", false, "Use list format to print data"); + opt.setRequired(false); + options.addOption(opt); return options; } @@ -58,12 +70,46 @@ public class TopicRouteSubCommand implements SubCommand { String topic = commandLine.getOptionValue('t').trim(); TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); - String json = topicRouteData.toJson(true); - System.out.printf("%s%n", json); + printData(topicRouteData, commandLine.hasOption('l')); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); } } -} + + private void printData(TopicRouteData topicRouteData, boolean useListFormat) { + if (!useListFormat) { + System.out.printf("%s%n", topicRouteData.toJson(true)); + return; + } + + int totalReadQueue = 0, totalWriteQueue = 0; + List queueDataList = topicRouteData.getQueueDatas(); + Map map = new HashMap<>(); + for (QueueData queueData : queueDataList) { + map.put(queueData.getBrokerName(), queueData); + } + queueDataList.sort(Comparator.comparing(QueueData::getBrokerName)); + + List brokerDataList = topicRouteData.getBrokerDatas(); + brokerDataList.sort(Comparator.comparing(BrokerData::getBrokerName)); + + System.out.printf(FORMAT, "#ClusterName", "#BrokerName", "#BrokerAddrs", "#ReadQueue", "#WriteQueue", "#Perm"); + + for (BrokerData brokerData : brokerDataList) { + String brokerName = brokerData.getBrokerName(); + QueueData queueData = map.get(brokerName); + totalReadQueue += queueData.getReadQueueNums(); + totalWriteQueue += queueData.getWriteQueueNums(); + System.out.printf(FORMAT, brokerData.getCluster(), brokerName, brokerData.getBrokerAddrs(), + queueData.getReadQueueNums(), queueData.getWriteQueueNums(), queueData.getPerm()); + } + + for (int i = 0; i < 158; i++) { + System.out.print("-"); + } + System.out.printf("%n"); + System.out.printf(FORMAT, "Total:", map.keySet().size(), "", totalReadQueue, totalWriteQueue, ""); + } +} \ No newline at end of file