提交 db05f817 编写于 作者: Z zhangjidi2016

Merge remote-tracking branch 'upstream/develop' into develop

...@@ -198,6 +198,7 @@ public class TraceDataEncoder { ...@@ -198,6 +198,7 @@ public class TraceDataEncoder {
} }
} }
break;
case EndTransaction: { case EndTransaction: {
TraceBean bean = ctx.getTraceBeans().get(0); TraceBean bean = ctx.getTraceBeans().get(0);
sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)// sb.append(ctx.getTraceType()).append(TraceConstants.CONTENT_SPLITOR)//
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.consumer; package org.apache.rocketmq.client.consumer;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
...@@ -45,7 +44,6 @@ import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServic ...@@ -45,7 +44,6 @@ import org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyServic
import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.consumer.ProcessQueue; import org.apache.rocketmq.client.impl.consumer.ProcessQueue;
import org.apache.rocketmq.client.impl.consumer.PullAPIWrapper;
import org.apache.rocketmq.client.impl.consumer.PullMessageService; import org.apache.rocketmq.client.impl.consumer.PullMessageService;
import org.apache.rocketmq.client.impl.consumer.PullRequest; import org.apache.rocketmq.client.impl.consumer.PullRequest;
import org.apache.rocketmq.client.impl.consumer.PullResultExt; import org.apache.rocketmq.client.impl.consumer.PullResultExt;
...@@ -88,7 +86,6 @@ public class DefaultMQPushConsumerTest { ...@@ -88,7 +86,6 @@ public class DefaultMQPushConsumerTest {
@Mock @Mock
private MQClientAPIImpl mQClientAPIImpl; private MQClientAPIImpl mQClientAPIImpl;
private PullAPIWrapper pullAPIWrapper;
private RebalanceImpl rebalanceImpl; private RebalanceImpl rebalanceImpl;
private DefaultMQPushConsumer pushConsumer; private DefaultMQPushConsumer pushConsumer;
...@@ -98,6 +95,27 @@ public class DefaultMQPushConsumerTest { ...@@ -98,6 +95,27 @@ public class DefaultMQPushConsumerTest {
factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear(); factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
consumerGroup = "FooBarGroup" + System.currentTimeMillis(); consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876"); pushConsumer.setNamesrvAddr("127.0.0.1:9876");
...@@ -115,58 +133,24 @@ public class DefaultMQPushConsumerTest { ...@@ -115,58 +133,24 @@ public class DefaultMQPushConsumerTest {
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
mQClientFactory = spy(mQClientFactory);
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
rebalanceImpl = spy(pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalanceImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientFactory.getMQClientAPIImpl().pullMessage(anyString(), any(PullMessageRequestHeader.class), rebalanceImpl = spy(pushConsumerImpl.getRebalanceImpl());
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class))) doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
.thenAnswer(new Answer<PullResult>() { FieldUtils.writeDeclaredField(pushConsumerImpl, "rebalanceImpl", rebalanceImpl, true);
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[] {'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
doReturn(123L).when(rebalanceImpl).computePullFromWhere(any(MessageQueue.class));
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
} }
@After @After
...@@ -292,7 +276,7 @@ public class DefaultMQPushConsumerTest { ...@@ -292,7 +276,7 @@ public class DefaultMQPushConsumerTest {
PullMessageService pullMessageService = mQClientFactory.getPullMessageService(); PullMessageService pullMessageService = mQClientFactory.getPullMessageService();
pullMessageService.executePullRequestImmediately(createPullRequest()); pullMessageService.executePullRequestImmediately(createPullRequest());
assertThat(countDownLatch.await(30, TimeUnit.SECONDS)).isTrue(); assertThat(countDownLatch.await(10, TimeUnit.SECONDS)).isTrue();
pushConsumer.shutdown(); pushConsumer.shutdown();
assertThat(messageConsumedFlag.get()).isTrue(); assertThat(messageConsumedFlag.get()).isTrue();
......
...@@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan; ...@@ -21,7 +21,6 @@ import io.opentracing.mock.MockSpan;
import io.opentracing.mock.MockTracer; import io.opentracing.mock.MockTracer;
import io.opentracing.tag.Tags; import io.opentracing.tag.Tags;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
...@@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner; ...@@ -75,6 +74,7 @@ import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyLong;
...@@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -105,6 +105,27 @@ public class DefaultMQConsumerWithOpenTracingTest {
factoryTable.forEach((s, instance) -> instance.shutdown()); factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear(); factoryTable.clear();
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
consumerGroup = "FooBarGroup" + System.currentTimeMillis(); consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup); pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook( pushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
...@@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -128,58 +149,20 @@ public class DefaultMQConsumerWithOpenTracingTest {
// suppress updateTopicRouteInfoFromNameServer // suppress updateTopicRouteInfoFromNameServer
pushConsumer.changeInstanceNameToPID(); pushConsumer.changeInstanceNameToPID();
mQClientFactory = spy(MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true))); mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(pushConsumer, (RPCHook) FieldUtils.readDeclaredField(pushConsumerImpl, "rpcHook", true));
FieldUtils.writeDeclaredField(mQClientFactory, "mQClientAPIImpl", mQClientAPIImpl, true);
mQClientFactory = spy(mQClientFactory);
factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory); factoryTable.put(pushConsumer.buildMQClientId(), mQClientFactory);
doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString()); doReturn(false).when(mQClientFactory).updateTopicRouteInfoFromNameServer(anyString());
rebalancePushImpl = spy(new RebalancePushImpl(pushConsumer.getDefaultMQPushConsumerImpl()));
Field field = DefaultMQPushConsumerImpl.class.getDeclaredField("rebalanceImpl");
field.setAccessible(true);
field.set(pushConsumerImpl, rebalancePushImpl);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
field = DefaultMQPushConsumerImpl.class.getDeclaredField("mQClientFactory");
field.setAccessible(true);
field.set(pushConsumerImpl, mQClientFactory);
field = MQClientInstance.class.getDeclaredField("mQClientAPIImpl");
field.setAccessible(true);
field.set(mQClientFactory, mQClientAPIImpl);
pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
field = DefaultMQPushConsumerImpl.class.getDeclaredField("pullAPIWrapper");
field.setAccessible(true);
field.set(pushConsumerImpl, pullAPIWrapper);
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().setmQClientFactory(mQClientFactory);
mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
.thenAnswer(new Answer<PullResult>() {
@Override
public PullResult answer(InvocationOnMock mock) throws Throwable {
PullMessageRequestHeader requestHeader = mock.getArgument(1);
MessageClientExt messageClientExt = new MessageClientExt();
messageClientExt.setTopic(topic);
messageClientExt.setQueueId(0);
messageClientExt.setMsgId("123");
messageClientExt.setBody(new byte[]{'a'});
messageClientExt.setOffsetMsgId("234");
messageClientExt.setBornHost(new InetSocketAddress(8080));
messageClientExt.setStoreHost(new InetSocketAddress(8080));
PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
return pullResult;
}
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>(); Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue()); messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
} }
@After @After
...@@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest { ...@@ -209,7 +192,8 @@ public class DefaultMQConsumerWithOpenTracingTest {
assertThat(msg.getTopic()).isEqualTo(topic); assertThat(msg.getTopic()).isEqualTo(topic);
assertThat(msg.getBody()).isEqualTo(new byte[]{'a'}); assertThat(msg.getBody()).isEqualTo(new byte[]{'a'});
assertThat(tracer.finishedSpans().size()).isEqualTo(1); // wait until consumeMessageAfter hook of tracer is done surely.
waitAtMost(1, TimeUnit.SECONDS).until(() -> tracer.finishedSpans().size() == 1);
MockSpan span = tracer.finishedSpans().get(0); MockSpan span = tracer.finishedSpans().get(0);
assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic); assertThat(span.tags().get(Tags.MESSAGE_BUS_DESTINATION.getKey())).isEqualTo(topic);
assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER); assertThat(span.tags().get(Tags.SPAN_KIND.getKey())).isEqualTo(Tags.SPAN_KIND_CONSUMER);
......
...@@ -446,7 +446,7 @@ public class ScheduledMessageConsumer { ...@@ -446,7 +446,7 @@ public class ScheduledMessageConsumer {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) { for (MessageExt message : messages) {
// Print approximate delay time period // Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later"); System.out.println("Receive message[msgId=" + message.getMsgId() + "] " + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later");
} }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
......
...@@ -54,7 +54,7 @@ public class Consumer { ...@@ -54,7 +54,7 @@ public class Consumer {
final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest"; final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20; final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 20;
final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer"; final String groupPrefix = commandLine.hasOption('g') ? commandLine.getOptionValue('g').trim() : "benchmark_consumer";
final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "true"; final String isSuffixEnable = commandLine.hasOption('p') ? commandLine.getOptionValue('p').trim() : "false";
final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null; final String filterType = commandLine.hasOption('f') ? commandLine.getOptionValue('f').trim() : null;
final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null; final String expression = commandLine.hasOption('e') ? commandLine.getOptionValue('e').trim() : null;
final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0; final double failRate = commandLine.hasOption('r') ? Double.parseDouble(commandLine.getOptionValue('r').trim()) : 0.0;
...@@ -190,7 +190,7 @@ public class Consumer { ...@@ -190,7 +190,7 @@ public class Consumer {
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
opt = new Option("p", "group prefix enable", true, "Consumer group name, Default: false"); opt = new Option("p", "group suffix enable", true, "Consumer group suffix enable, Default: false");
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
......
...@@ -17,12 +17,14 @@ ...@@ -17,12 +17,14 @@
package org.apache.rocketmq.example.benchmark; package org.apache.rocketmq.example.benchmark;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Random; import java.util.Random;
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
...@@ -58,9 +60,10 @@ public class Producer { ...@@ -58,9 +60,10 @@ public class Producer {
final int tagCount = commandLine.hasOption('l') ? Integer.parseInt(commandLine.getOptionValue('l')) : 0; final int tagCount = commandLine.hasOption('l') ? Integer.parseInt(commandLine.getOptionValue('l')) : 0;
final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m')); final boolean msgTraceEnable = commandLine.hasOption('m') && Boolean.parseBoolean(commandLine.getOptionValue('m'));
final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a')); final boolean aclEnable = commandLine.hasOption('a') && Boolean.parseBoolean(commandLine.getOptionValue('a'));
final long messageNum = commandLine.hasOption('q') ? Long.parseLong(commandLine.getOptionValue('q')) : 0;
System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n", System.out.printf("topic: %s threadCount: %d messageSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s messageQuantity: %d%n",
topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable); topic, threadCount, messageSize, keyEnable, propertySize, tagCount, msgTraceEnable, aclEnable, messageNum);
final InternalLogger log = ClientLogger.getLog(); final InternalLogger log = ClientLogger.getLog();
...@@ -72,6 +75,16 @@ public class Producer { ...@@ -72,6 +75,16 @@ public class Producer {
final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>(); final LinkedList<Long[]> snapshotList = new LinkedList<Long[]>();
final long[] msgNums = new long[threadCount];
if (messageNum > 0) {
Arrays.fill(msgNums, messageNum / threadCount);
long mod = messageNum % threadCount;
if (mod > 0) {
msgNums[0] += mod;
}
}
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
@Override @Override
public void run() { public void run() {
...@@ -85,14 +98,7 @@ public class Producer { ...@@ -85,14 +98,7 @@ public class Producer {
timer.scheduleAtFixedRate(new TimerTask() { timer.scheduleAtFixedRate(new TimerTask() {
private void printStats() { private void printStats() {
if (snapshotList.size() >= 10) { if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst(); doPrintStats(snapshotList, statsBenchmark, false);
Long[] end = snapshotList.getLast();
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
} }
} }
...@@ -120,9 +126,14 @@ public class Producer { ...@@ -120,9 +126,14 @@ public class Producer {
producer.start(); producer.start();
for (int i = 0; i < threadCount; i++) { for (int i = 0; i < threadCount; i++) {
final long msgNumLimit = msgNums[i];
if (messageNum > 0 && msgNumLimit == 0) {
break;
}
sendThreadPool.execute(new Runnable() { sendThreadPool.execute(new Runnable() {
@Override @Override
public void run() { public void run() {
int num = 0;
while (true) { while (true) {
try { try {
final Message msg; final Message msg;
...@@ -198,10 +209,28 @@ public class Producer { ...@@ -198,10 +209,28 @@ public class Producer {
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
} }
if (messageNum > 0 && ++num >= msgNumLimit) {
break;
}
} }
} }
}); });
} }
try {
sendThreadPool.shutdown();
sendThreadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
timer.cancel();
if (snapshotList.size() > 1) {
doPrintStats(snapshotList, statsBenchmark, true);
} else {
System.out.printf("[Complete] Send Total: %d Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
statsBenchmark.getSendRequestFailedCount().get(), statsBenchmark.getReceiveResponseFailedCount().get());
}
producer.shutdown();
} catch (InterruptedException e) {
log.error("[Exit] Thread Interrupted Exception", e);
}
} }
public static Options buildCommandlineOptions(final Options options) { public static Options buildCommandlineOptions(final Options options) {
...@@ -233,6 +262,10 @@ public class Producer { ...@@ -233,6 +262,10 @@ public class Producer {
opt.setRequired(false); opt.setRequired(false);
options.addOption(opt); options.addOption(opt);
opt = new Option("q", "messageQuantity", true, "Send message quantity, Default: 0, running forever");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
...@@ -249,6 +282,23 @@ public class Producer { ...@@ -249,6 +282,23 @@ public class Producer {
return msg; return msg;
} }
private static void doPrintStats(final LinkedList<Long[]> snapshotList, final StatsBenchmarkProducer statsBenchmark, boolean done) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
final long sendTps = (long) (((end[3] - begin[3]) / (double) (end[0] - begin[0])) * 1000L);
final double averageRT = (end[5] - begin[5]) / (double) (end[3] - begin[3]);
if (done) {
System.out.printf("[Complete] Send Total: %d Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
statsBenchmark.getSendRequestSuccessCount().get() + statsBenchmark.getSendRequestFailedCount().get(),
sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
} else {
System.out.printf("Current Time: %s Send TPS: %d Max RT(ms): %d Average RT(ms): %7.3f Send Failed: %d Response Failed: %d%n",
System.currentTimeMillis(), sendTps, statsBenchmark.getSendMessageMaxRT().get(), averageRT, end[2], end[4]);
}
}
} }
class StatsBenchmarkProducer { class StatsBenchmarkProducer {
......
...@@ -439,6 +439,12 @@ ...@@ -439,6 +439,12 @@
<version>3.10.0</version> <version>3.10.0</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
</dependencies> </dependencies>
<dependencyManagement> <dependencyManagement>
...@@ -536,7 +542,7 @@ ...@@ -536,7 +542,7 @@
<dependency> <dependency>
<groupId>com.alibaba</groupId> <groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId> <artifactId>fastjson</artifactId>
<version>1.2.69</version> <version>1.2.76</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.javassist</groupId> <groupId>org.javassist</groupId>
......
...@@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -43,7 +43,7 @@ public class ListDataCollectorImpl implements DataCollector {
return datas; return datas;
} }
public void resetData() { public synchronized void resetData() {
datas.clear(); datas.clear();
unlockIncrement(); unlockIncrement();
} }
...@@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -67,7 +67,7 @@ public class ListDataCollectorImpl implements DataCollector {
return Collections.frequency(datas, data) == 1; return Collections.frequency(datas, data) == 1;
} }
public Collection<Object> getAllDataWithoutDuplicate() { public synchronized Collection<Object> getAllDataWithoutDuplicate() {
return new HashSet<Object>(datas); return new HashSet<Object>(datas);
} }
...@@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector { ...@@ -81,7 +81,7 @@ public class ListDataCollectorImpl implements DataCollector {
return res; return res;
} }
public void removeData(Object data) { public synchronized void removeData(Object data) {
datas.remove(data); datas.remove(data);
} }
......
...@@ -19,14 +19,23 @@ package org.apache.rocketmq.tools.command.topic; ...@@ -19,14 +19,23 @@ package org.apache.rocketmq.tools.command.topic;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option; import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options; import org.apache.commons.cli.Options;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.SubCommand; import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException; import org.apache.rocketmq.tools.command.SubCommandException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopicRouteSubCommand implements SubCommand { public class TopicRouteSubCommand implements SubCommand {
private static final String FORMAT = "%-45s %-32s %-50s %-10s %-11s %-5s%n";
@Override @Override
public String commandName() { public String commandName() {
return "topicRoute"; return "topicRoute";
...@@ -43,6 +52,9 @@ public class TopicRouteSubCommand implements SubCommand { ...@@ -43,6 +52,9 @@ public class TopicRouteSubCommand implements SubCommand {
opt.setRequired(true); opt.setRequired(true);
options.addOption(opt); options.addOption(opt);
opt = new Option("l", "list", false, "Use list format to print data");
opt.setRequired(false);
options.addOption(opt);
return options; return options;
} }
...@@ -58,12 +70,46 @@ public class TopicRouteSubCommand implements SubCommand { ...@@ -58,12 +70,46 @@ public class TopicRouteSubCommand implements SubCommand {
String topic = commandLine.getOptionValue('t').trim(); String topic = commandLine.getOptionValue('t').trim();
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic); TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
String json = topicRouteData.toJson(true); printData(topicRouteData, commandLine.hasOption('l'));
System.out.printf("%s%n", json);
} catch (Exception e) { } catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally { } finally {
defaultMQAdminExt.shutdown(); defaultMQAdminExt.shutdown();
} }
} }
}
private void printData(TopicRouteData topicRouteData, boolean useListFormat) {
if (!useListFormat) {
System.out.printf("%s%n", topicRouteData.toJson(true));
return;
}
int totalReadQueue = 0, totalWriteQueue = 0;
List<QueueData> queueDataList = topicRouteData.getQueueDatas();
Map<String /*brokerName*/, QueueData> map = new HashMap<>();
for (QueueData queueData : queueDataList) {
map.put(queueData.getBrokerName(), queueData);
}
queueDataList.sort(Comparator.comparing(QueueData::getBrokerName));
List<BrokerData> brokerDataList = topicRouteData.getBrokerDatas();
brokerDataList.sort(Comparator.comparing(BrokerData::getBrokerName));
System.out.printf(FORMAT, "#ClusterName", "#BrokerName", "#BrokerAddrs", "#ReadQueue", "#WriteQueue", "#Perm");
for (BrokerData brokerData : brokerDataList) {
String brokerName = brokerData.getBrokerName();
QueueData queueData = map.get(brokerName);
totalReadQueue += queueData.getReadQueueNums();
totalWriteQueue += queueData.getWriteQueueNums();
System.out.printf(FORMAT, brokerData.getCluster(), brokerName, brokerData.getBrokerAddrs(),
queueData.getReadQueueNums(), queueData.getWriteQueueNums(), queueData.getPerm());
}
for (int i = 0; i < 158; i++) {
System.out.print("-");
}
System.out.printf("%n");
System.out.printf(FORMAT, "Total:", map.keySet().size(), "", totalReadQueue, totalWriteQueue, "");
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册