diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java index 1da7380b6d9f53b8b0c0909e50093a806ac5676a..9148422ff101a6e4ed42e141fb9fbf3d2f77a0cf 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java @@ -130,8 +130,8 @@ public class PlainPermissionLoader { if (!ownedPermMap.containsKey(resource)) { // Check the default perm - byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : - needCheckedAccess.getDefaultTopicPerm(); + byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : + ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 762c1921ab1c71d0f95047a2ec1e418842e41f43..c898366e981b1efb5610dfba50fe258be47e3087 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -469,8 +469,6 @@ public class BrokerController { } } initialTransaction(); - initialAcl(); -// initialRpcHooks(); } return result; } @@ -528,16 +526,16 @@ public class BrokerController { // } } - private void initialRpcHooks() { - - List rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class); - if (rpcHooks == null || rpcHooks.isEmpty()) { - return; - } - for (RPCHook rpcHook : rpcHooks) { - this.remotingServer.registerServerRPCHook(rpcHook); - } - } +// private void initialRpcHooks() { +// +// List rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class); +// if (rpcHooks == null || rpcHooks.isEmpty()) { +// return; +// } +// for (RPCHook rpcHook : rpcHooks) { +// this.remotingServer.registerServerRPCHook(rpcHook); +// } +// } // registerInterceoptorGroup() diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index aedeff20aa3d1210ab91d7109eb7f28b2c4b0c3a..9665fc34a90f4933683ebb4d7215384a7f630e82 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; @@ -185,6 +186,8 @@ public class DefaultMQConsumerWithTraceTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish(); + Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); @@ -195,7 +198,7 @@ public class DefaultMQConsumerWithTraceTest { pushConsumer.shutdown(); } -// @Test + @Test public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 903be01cddbf13031368db73daa9df8ab6198ebb..a2a7c758d83262b916f4db5c8e8a035980cef7b5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -116,6 +116,7 @@ public class DefaultMQProducerWithTraceTest { when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); } diff --git a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java index e67b6e7d1348046db5fe3506995fa6bdd322dc65..4a89e2c5492d7ae8ee3cbbfb9e1b2aae03878c39 100644 --- a/common/src/main/java/org/apache/rocketmq/common/MQVersion.java +++ b/common/src/main/java/org/apache/rocketmq/common/MQVersion.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.common; public class MQVersion { - public static final int CURRENT_VERSION = Version.V4_4_0.ordinal(); + public static final int CURRENT_VERSION = Version.V4_4_1.ordinal(); public static String getVersionDesc(int value) { int length = Version.values().length; diff --git a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java index fd3b299b13e9ca3ac44f280ed0f0ca5e6978bcbf..e866d9c5682be7be195cf878ff78d8618af440aa 100644 --- a/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java @@ -92,6 +92,10 @@ public class SnodeConfig { private int listenPort = 11911; + private int metricsExportPort = 1234; + + private boolean metricsEnable = true; + private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); private boolean enablePropertyFilter = true; @@ -221,7 +225,7 @@ public class SnodeConfig { } public void setSnodeHandleMqttThreadPoolQueueCapacity( - int snodeHandleMqttThreadPoolQueueCapacity) { + int snodeHandleMqttThreadPoolQueueCapacity) { this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity; } @@ -380,4 +384,20 @@ public class SnodeConfig { public void setAclEnable(boolean aclEnable) { this.aclEnable = aclEnable; } + + public int getMetricsExportPort() { + return metricsExportPort; + } + + public void setMetricsExportPort(int metricsExportPort) { + this.metricsExportPort = metricsExportPort; + } + + public boolean isMetricsEnable() { + return metricsEnable; + } + + public void setMetricsEnable(boolean metricsEnable) { + this.metricsEnable = metricsEnable; + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java index 210e57823dbdf3a843301adbb5e61176f4fc88c1..3a5ccb6f31ea254e2bb6aa79e9f20acb4b3c5bef 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java @@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.utils.CallSnapshot; import org.apache.rocketmq.logging.InternalLogger; public class StatsItem { @@ -227,28 +228,3 @@ public class StatsItem { } } -class CallSnapshot { - private final long timestamp; - private final long times; - - private final long value; - - public CallSnapshot(long timestamp, long times, long value) { - super(); - this.timestamp = timestamp; - this.times = times; - this.value = value; - } - - public long getTimestamp() { - return timestamp; - } - - public long getTimes() { - return times; - } - - public long getValue() { - return value; - } -} diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java index 9a0caaa112d038e4325ca6d427aaee9df0058328..19d515169a6acd1d108161645e392488859d5814 100644 --- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java +++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java @@ -162,14 +162,11 @@ public class StatsItemSet { StatsItem statsItem = this.statsItemTable.get(statsKey); if (null == statsItem) { statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log); - StatsItem prev = this.statsItemTable.put(statsKey, statsItem); - - if (null == prev) { - - // statsItem.init(); + StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem); + if (null != prev) { + statsItem = prev; } } - return statsItem; } diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CallSnapshot.java b/common/src/main/java/org/apache/rocketmq/common/utils/CallSnapshot.java new file mode 100644 index 0000000000000000000000000000000000000000..1b005be5736e74502aa7a1522e2250e3f8e4cf60 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/utils/CallSnapshot.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.utils; +public class CallSnapshot { + private final long timestamp; + private final long times; + + private final long value; + + public CallSnapshot(long timestamp, long times, long value) { + super(); + this.timestamp = timestamp; + this.times = times; + this.value = value; + } + + public long getTimestamp() { + return timestamp; + } + + public long getTimes() { + return times; + } + + public long getValue() { + return value; + } +} + diff --git a/example/pom.xml b/example/pom.xml index c43ff785e1ef68be1070d302a299e2799a449a30..b192e03f2c0590b354e1b0f057073a023bff1ed5 100644 --- a/example/pom.xml +++ b/example/pom.xml @@ -58,5 +58,9 @@ rocketmq-acl 4.4.1-SNAPSHOT + + io.prometheus + simpleclient_hotspot + diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index 8523df82335586858af92e56407c1146b00e08cf..c85ed9c84a8ca6cc2ad1553011d44e0bbdc59b36 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -61,6 +61,8 @@ public class Consumer { /* * Register callback to execute on arrival of messages fetched from brokers. */ + consumer.setNamesrvAddr("47.102.149.193:9876"); + consumer.registerMessageListener(new MessageListenerConcurrently() { @Override diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java index 546ddc7695a096a1769eeb0cea7ed4e4cd69bd09..98dcd61ca1afd0ab6f4f2074d8612ab47feea1df 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java @@ -47,6 +47,7 @@ public class Producer { /* * Launch the instance. */ +// producer.setNamesrvAddr("47.102.149.193:9876"); producer.start(); for (int i = 0; i < 10; i++) { diff --git a/pom.xml b/pom.xml index 482ab4ff2e87f2f2424055b154f3aebacf45a07f..e18627d28af45b06d4458a6be5e4f23459658105 100644 --- a/pom.xml +++ b/pom.xml @@ -633,6 +633,21 @@ commons-codec 1.9 + + io.prometheus + simpleclient + 0.6.0 + + + io.prometheus + simpleclient_httpserver + 0.6.0 + + + io.prometheus + simpleclient_hotspot + 0.6.0 + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java index 2946d8513f1ea87dcb2d502ad6ae874bb8fe2f07..764b5f5ed695da7f10d77f448856461a981f03d5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java @@ -82,13 +82,13 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo public RemotingClient init(ClientConfig clientConfig, ChannelEventListener channelEventListener) { this.nettyClientConfig = clientConfig; this.channelEventListener = channelEventListener; - this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads", + this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyClientEpollIoThreads", clientConfig.getClientWorkerThreads())); this.publicExecutor = ThreadUtils.newFixedThreadPool( clientConfig.getClientCallbackExecutorThreads(), - 10000, "Remoting-PublicExecutor", true); + 10000, "Http2Remoting-PublicExecutor", true); this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(), - ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); + ThreadUtils.newGenericThreadFactory("Http2NettyClientWorkerThreads", clientConfig.getClientWorkerThreads())); buildHttp2SslClientContext(); return this; } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java index fbf60d575af95629083d3e8eee10b15adf793fbc..041425df1628c645b3aa7613f5ce0dcecfcdabc7 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java @@ -97,23 +97,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo this.channelEventListener = channelEventListener; this.publicExecutor = ThreadUtils.newFixedThreadPool( serverConfig.getServerCallbackExecutorThreads(), - 10000, "Remoting-PublicExecutor", true); + 10000, "Http2Remoting-PublicExecutor", true); if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) { - this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads", + this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyEpollIoThreads", serverConfig.getServerSelectorThreads())); - this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads", + this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads", serverConfig.getServerAcceptorThreads())); this.socketChannelClass = EpollServerSocketChannel.class; } else { - this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads", + this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads", serverConfig.getServerAcceptorThreads())); - this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads", + this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyNioIoThreads", serverConfig.getServerSelectorThreads())); this.socketChannelClass = NioServerSocketChannel.class; } this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(), - ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads())); + ThreadUtils.newGenericThreadFactory("Http2NettyWorkerThreads", serverConfig.getServerWorkerThreads())); this.port = nettyServerConfig.getListenPort(); buildHttp2SslServerContext(); return this; diff --git a/snode/pom.xml b/snode/pom.xml index 874b52653dd45ec707548969d14d57db0faaf59a..bb28f6406c0837577fe71118baf5d4c75746d94e 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -19,7 +19,7 @@ org.apache.rocketmq rocketmq-all - 4.4.0-SNAPSHOT + 4.4.1-SNAPSHOT 4.0.0 @@ -80,6 +80,18 @@ org.yaml snakeyaml + + io.prometheus + simpleclient + + + io.prometheus + simpleclient_httpserver + + + io.prometheus + simpleclient_hotspot + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 1e1b97d888ebd5c14a479987d3bf4bb162940cbc..e3ed0e778919fba49f4af32cc1216f08665341e3 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -74,11 +74,13 @@ import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandl import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler; import org.apache.rocketmq.snode.service.ClientService; import org.apache.rocketmq.snode.service.EnodeService; +import org.apache.rocketmq.snode.service.MetricsService; import org.apache.rocketmq.snode.service.NnodeService; import org.apache.rocketmq.snode.service.PushService; import org.apache.rocketmq.snode.service.ScheduledService; import org.apache.rocketmq.snode.service.impl.ClientServiceImpl; import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl; +import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl; import org.apache.rocketmq.snode.service.impl.PushServiceImpl; import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl; @@ -121,6 +123,7 @@ public class SnodeController { private PushService pushService; private ClientService clientService; private SlowConsumerService slowConsumerService; + private MetricsService metricsService; private final ScheduledExecutorService scheduledExecutorService = Executors .newSingleThreadScheduledExecutor(new ThreadFactoryImpl( @@ -217,6 +220,7 @@ public class SnodeController { this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager, this.consumerManager, this.iotClientManager); this.slowConsumerService = new SlowConsumerServiceImpl(this); + this.metricsService = new MetricsServiceImpl(); } public SnodeConfig getSnodeConfig() { @@ -384,6 +388,7 @@ public class SnodeController { this.mqttRemotingClient.start(); this.scheduledService.startScheduleTask(); this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval()); + this.metricsService.start(this.snodeConfig.getMetricsExportPort()); } public void shutdown() { @@ -399,9 +404,6 @@ public class SnodeController { if (this.heartbeatExecutor != null) { this.heartbeatExecutor.shutdown(); } -// if (this.consumerManagerExecutor != null) { -// this.consumerManagerExecutor.shutdown(); -// } if (this.scheduledExecutorService != null) { this.scheduledExecutorService.shutdown(); } @@ -423,6 +425,9 @@ public class SnodeController { if (this.pushService != null) { this.pushService.shutdown(); } + if (this.metricsService != null) { + this.metricsService.shutdown(); + } } public RemotingServer getSnodeServer() { @@ -553,4 +558,12 @@ public class SnodeController { public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) { this.consumerOffsetManager = consumerOffsetManager; } + + public MetricsService getMetricsService() { + return metricsService; + } + + public void setMetricsService(MetricsService metricsService) { + this.metricsService = metricsService; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..a61f9b3a9823def0c4a45a12018bbd90d8edfc80 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.mqtrace; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.interceptor.ResponseContext; + +public class MsgTraceServiceImpl implements Interceptor { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + @Override + public String interceptorName() { + return "snodeMsgTraceInterceptor"; + } + + @Override + public void beforeRequest(RequestContext requestContext) { + } + + @Override + public void afterRequest(ResponseContext responseContext) { + } + + @Override + public void onException(ExceptionContext exceptionContext) { + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index ef49f289b49c27e66329a8fe9b152aa54172a8a3..e4ed77e6baf191543f83cdf5f8bb8aa2944d4954 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -121,23 +121,10 @@ public class ConsumerOffsetManager { return -1; } -// public String encode(final boolean prettyFormat) { -// return RemotingSerializable.toJson(this, prettyFormat); -// } - public ConcurrentMap> getOffsetTable() { return offsetTable; } -// public void setOffsetTable(ConcurrentHashMap> offsetTable) { -// this.offsetTable = offsetTable; -// } - -// public Map queryOffset(final String enodeName, final String group, final String topic) { -// // topic@group -// String key = buildKey(enodeName, topic, group); -// return this.offsetTable.get(key); -// } public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic, final int queueId, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index b498fa551dea655d923e0fd77c6ee5a63a684f89..ff13aeaa743fde98ec760cd13a70b0d77ec7b21e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; +import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.ResponseCode; @@ -27,11 +28,12 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.exception.RemotingCommandException; -import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.snode.SnodeController; import org.apache.rocketmq.remoting.interceptor.ExceptionContext; import org.apache.rocketmq.remoting.interceptor.RequestContext; import org.apache.rocketmq.remoting.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.snode.SnodeController; +import org.apache.rocketmq.snode.service.MetricsService; public class SendMessageProcessor implements RequestProcessor { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); @@ -45,25 +47,41 @@ public class SendMessageProcessor implements RequestProcessor { @Override public RemotingCommand processRequest(RemotingChannel remotingChannel, RemotingCommand request) throws RemotingCommandException { + this.snodeController.getMetricsService().incRequestCount(request.getCode(), true); + try { + processSendMessageRequest(remotingChannel, request); + } catch (Exception ex) { + this.snodeController.getMetricsService().incRequestCount(request.getCode(), false); + throw ex; + } + return null; + } + + private void processSendMessageRequest(RemotingChannel remotingChannel, + RemotingCommand request) throws RemotingCommandException { + MetricsService.Timer timer = this.snodeController.getMetricsService().startTimer(request.getCode()); if (this.snodeController.getSendMessageInterceptorGroup() != null) { RequestContext requestContext = new RequestContext(request, remotingChannel); this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext); } String enodeName; SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null; + final StringBuffer stringBuffer = new StringBuffer(); + ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = null; boolean isSendBack = false; - if (request.getCode() == RequestCode.SEND_MESSAGE_V2) { + if (request.getCode() == RequestCode.SEND_MESSAGE_V2 || + request.getCode() == RequestCode.SEND_BATCH_MESSAGE) { sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); enodeName = sendMessageRequestHeaderV2.getN(); + stringBuffer.append(sendMessageRequestHeaderV2.getB()); } else { isSendBack = true; - ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); + consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class); enodeName = consumerSendMsgBackRequestHeader.getEnodeName(); + stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup())); } CompletableFuture responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request); - - final String topic = sendMessageRequestHeaderV2.getB(); final Integer queueId = sendMessageRequestHeaderV2.getE(); final byte[] message = request.getBody(); final boolean isNeedPush = !isSendBack; @@ -74,18 +92,20 @@ public class SendMessageProcessor implements RequestProcessor { this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext); } remotingChannel.reply(data); + this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length); if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { - this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data); + this.snodeController.getPushService().pushMessage(enodeName, stringBuffer.toString(), queueId, message, data); } } else { + this.snodeController.getMetricsService().incRequestCount(request.getCode(), false); if (this.snodeController.getSendMessageInterceptorGroup() != null) { ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null); this.snodeController.getSendMessageInterceptorGroup().onException(exceptionContext); } log.error("Send Message error: {}", ex); } + timer.observeDuration(); }); - return null; } @Override diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java index c1b1633cabf5ea80e45e624f1f0d2f7d860a9fb7..fb8691e7192b38d53253a4cf12b66b4d7eef5c76 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java @@ -59,7 +59,7 @@ public class MqttConnectMessageHandler implements MessageHandler { private boolean isConnected(RemotingChannel remotingChannel, String clientId) { ClientManager iotClientManager = snodeController.getIotClientManager(); - Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel); + Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { return true; } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java index 22f5fe2c563a69f56efe708665594ca17517130e..f7dedef4696afa6d5f09f15ab5cfbd7f45cf9d59 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java @@ -18,4 +18,21 @@ package org.apache.rocketmq.snode.service; public interface MetricsService { + interface Timer { + Timer startTimer(int requestCode); + + void observeDuration(); + } + + void incRequestCount(int requestCode, boolean success); + + void recordRequestSize(String topic, double size); + + Timer startTimer(int requestCode); + + void recordRequestLatency(Timer timer); + + void start(int port); + + void shutdown(); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..84b06beb0ea9e8c7c7e528caa1dca9cfffe4f183 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/MetricsServiceImpl.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service.impl; + +import io.prometheus.client.Counter; +import io.prometheus.client.Summary; +import io.prometheus.client.exporter.HTTPServer; +import io.prometheus.client.hotspot.DefaultExports; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.common.protocol.ResponseCode; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.snode.exception.SnodeException; +import org.apache.rocketmq.snode.service.MetricsService; + +public class MetricsServiceImpl implements MetricsService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME); + + public MetricsServiceImpl() { + } + + private HTTPServer server; + + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "SNodeStatsScheduledThread"); + } + }); + + private StatsItemSet statsItemSet = new StatsItemSet("SnodeStats", scheduledExecutorService, log); + + private final static Counter requestTotal = Counter.build().name("request_total").help("request total count").labelNames("requestCode").register(); + + private final static Counter requestFailedTotal = Counter.build().name("request_failed_total").help("request total count").labelNames("requestCode").register(); + + private final static Summary requestLatency = Summary.build() + .quantile(0.5, 0.05) + .quantile(0.9, 0.01) + .quantile(0.99, 0.001) + .name("requests_latency_seconds").labelNames("requestCode").help("Request latency in seconds.").register(); + + private final static Summary receivedBytes = Summary.build() + .quantile(0.5, 0.05) + .quantile(0.9, 0.01) + .quantile(0.99, 0.001) + .labelNames("topic") + .name("sent_topic_size_bytes").help("Request size in bytes.").register(); + + @Override + synchronized public void incRequestCount(int requestCode, boolean success) { + if (!success) { + this.requestFailedTotal.labels(requestCode + "").inc(); + this.statsItemSet.addValue("TotalFailed@" + requestCode, 1, 1); + } else { + this.requestTotal.labels(requestCode + "").inc(); + this.statsItemSet.addValue("Total@" + requestCode, 1, 1); + } + } + + @Override + synchronized public void recordRequestSize(String topic, double size) { + this.receivedBytes.labels(topic).observe(size); + this.statsItemSet.addValue("TotalSize@" + topic, new Double(size).intValue(), 1); + } + + @Override + public Timer startTimer(int requestCode) { + return new PrometheusTimer(this.requestLatency).startTimer(requestCode); + } + + @Override + public void recordRequestLatency(Timer timer) { + timer.observeDuration(); + } + + @Override public void start(int port) { + try { + DefaultExports.initialize(); + server = new HTTPServer(port); + } catch (Exception ex) { + log.error("Start metrics http server failed!", ex); + throw new SnodeException(ResponseCode.SYSTEM_ERROR, "Start metrics http server failed!"); + } + } + + @Override public void shutdown() { + this.server.stop(); + } + + class PrometheusTimer implements Timer { + + private Summary.Timer timer; + + private Summary summary; + + public PrometheusTimer(Summary summary) { + this.summary = summary; + } + + @Override + public Timer startTimer(int requestCode) { + this.timer = summary.labels(requestCode + "").startTimer(); + return this; + } + + @Override + public void observeDuration() { + if (this.timer != null) { + this.timer.observeDuration(); + } + } + } + + public StatsItemSet getStatsItemSet() { + return statsItemSet; + } + + public void setStatsItemSet(StatsItemSet statsItemSet) { + this.statsItemSet = statsItemSet; + } + + public Counter getRequestTotal() { + return requestTotal; + } + + public Counter getRequestFailedTotal() { + return requestFailedTotal; + } + + public Summary getRequestLatency() { + return requestLatency; + } + + public Summary getReceivedBytes() { + return receivedBytes; + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index 575855523c87eedc64b9a181c40f7a33f70b483f..908d094cf5afdd6e3c99e26517e55d32ec5c0d64 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader; import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader; +import org.apache.rocketmq.common.protocol.route.BrokerData; import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -207,7 +208,10 @@ public class NnodeServiceImpl implements NnodeService { clusterInfo = this.updateEnodeClusterInfo(); } if (this.clusterInfo != null) { - return this.clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs().get(MixAll.MASTER_ID); + BrokerData brokerData = this.clusterInfo.getBrokerAddrTable().get(enodeName); + if (brokerData != null) { + return brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); + } } return null; } diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..847d190cbe06fd24cc5c3779be5d57793ec1e48c 100644 --- a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor +++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor @@ -0,0 +1 @@ +org.apache.rocketmq.snode.mqtrace.MsgTraceServiceImpl \ No newline at end of file diff --git a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java index 2573f0272f24b5317b4a2d5c1afa95fac9707737..e6e56a826b31f34ab8f07b402e5c55c320c435f0 100644 --- a/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java +++ b/snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java @@ -71,12 +71,18 @@ public class SendMessageProcessorTest { } @Test - public void testProcessRequest() throws RemotingCommandException { + public void testSendMessageV2ProcessRequest() throws RemotingCommandException { + CompletableFuture future = new CompletableFuture<>(); + RemotingCommand request = createSendMesssageV2Command(); + when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future); + sendMessageProcessor.processRequest(remotingChannel, request); + } + + @Test + public void testSendBatchMessageProcessRequest() throws RemotingCommandException { snodeController.setEnodeService(enodeService); CompletableFuture future = new CompletableFuture<>(); - RemotingCommand request = createSendMesssageCommand(); - SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class); - System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2); + RemotingCommand request = createSendBatchMesssageCommand(); when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future); sendMessageProcessor.processRequest(remotingChannel, request); } @@ -95,7 +101,7 @@ public class SendMessageProcessorTest { return requestHeader; } - private RemotingCommand createSendMesssageCommand() { + private RemotingCommand createSendMesssageV2Command() { SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader(); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, sendMessageRequestHeaderV2); request.setBody(new byte[] {'a'}); @@ -103,6 +109,14 @@ public class SendMessageProcessorTest { return request; } + private RemotingCommand createSendBatchMesssageCommand() { + SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, sendMessageRequestHeaderV2); + request.setBody(new byte[] {'b'}); + CodecHelper.makeCustomHeaderToNet(request); + return request; + } + RemotingCommand createSendMessageResponse(int responseCode) { return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null); } diff --git a/snode/src/test/java/org/apache/rocketmq/snode/service/MetricsServiceImplTest.java b/snode/src/test/java/org/apache/rocketmq/snode/service/MetricsServiceImplTest.java new file mode 100644 index 0000000000000000000000000000000000000000..cd74e81f9c6886a09ac354cb2f3e0af7ffdaa637 --- /dev/null +++ b/snode/src/test/java/org/apache/rocketmq/snode/service/MetricsServiceImplTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.service; + +import io.prometheus.client.CollectorRegistry; +import java.lang.reflect.Field; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.assertj.core.api.Assertions.assertThat; + +@RunWith(MockitoJUnitRunner.class) +public class MetricsServiceImplTest { + private MetricsServiceImpl metricsService = new MetricsServiceImpl(); + + private final String requestCode = "310"; + + private final String topic = "Test"; + + @Test + public void testIncRequestCountFalse() throws Exception { + metricsService.incRequestCount(310, false); + Double requestFailedTotal = getLabelsValue("request_failed_total", requestCode); + Double requestTotal = getLabelsValue("request_total", requestCode); + assertThat(requestFailedTotal).isEqualTo(1.0); + assertThat(requestTotal); + } + + @Test + public void testIncRequestCountTrue() { + metricsService.incRequestCount(310, true); + Double requestTotal = getLabelsValue("request_total", requestCode); + assertThat(requestTotal).isEqualTo(1.0); + } + + @Test + public void testRequestSize() throws Exception { + Field field = MetricsServiceImpl.class.getDeclaredField("statsItemSet"); + field.setAccessible(true); + metricsService.recordRequestSize(topic, 100); + StatsItemSet statsItemSet = (StatsItemSet) field.get(metricsService); + AtomicLong requestSize = statsItemSet.getStatsItem("TotalSize@" + topic).getValue(); + assertThat(requestSize.intValue()).isEqualTo(100); + } + + public Double getLabelsValue(String name, String labelValue) { + return CollectorRegistry.defaultRegistry.getSampleValue(name, new String[] {"requestCode"}, new String[] {labelValue}); + } +} diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java index dc829c1c1b516768724932a39f15ceca82fd7676..1fd2cb0c4daa1c20685ac0073ca32ba5f41e9baf 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -222,6 +223,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt { return defaultMQAdminExtImpl.examineBrokerClusterInfo(); } + @Override + public SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException { + return defaultMQAdminExtImpl.examineSnodeClusterInfo(); + } + @Override public TopicRouteData examineTopicRouteInfo( String topic) throws RemotingException, MQClientException, InterruptedException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java index 2a7815b61709c802d7881dd821919af83019aaba..b2e176a6dc2267b83280b9ba1ecc289204538d36 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java @@ -47,6 +47,7 @@ import org.apache.rocketmq.common.admin.RollbackStats; import org.apache.rocketmq.common.admin.TopicOffset; import org.apache.rocketmq.common.admin.TopicStatsTable; import org.apache.rocketmq.common.help.FAQUrl; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.common.message.MessageClientExt; import org.apache.rocketmq.common.message.MessageConst; @@ -275,6 +276,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner { return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis); } + @Override + public SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException { + return this.mqClientInstance.getMQClientAPIImpl().getSnodeClusterInfo(timeoutMillis); + } + @Override public TopicRouteData examineTopicRouteInfo( String topic) throws RemotingException, MQClientException, InterruptedException { diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java index 16b4427575faaa021ba97ea1828ebdab95f7bc8c..d16947827dbfeef958186f9566a869aa3ee760ee 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java @@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable; import org.apache.rocketmq.common.protocol.body.ProducerConnection; import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody; import org.apache.rocketmq.common.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper; import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper; import org.apache.rocketmq.common.protocol.body.TopicList; @@ -100,6 +101,9 @@ public interface MQAdminExt extends MQAdmin { ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException; + SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, + RemotingSendRequestException, RemotingConnectException; + TopicRouteData examineTopicRouteInfo( final String topic) throws RemotingException, MQClientException, InterruptedException; diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java index 6a0cd71c177b5f2ebffef15d6c0402bbaedca59d..a5a91eadc04b796ad245f0560301dc3e00baeff2 100644 --- a/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java +++ b/tools/src/main/java/org/apache/rocketmq/tools/command/cluster/ClusterListSubCommand.java @@ -26,7 +26,9 @@ import org.apache.commons.cli.Options; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.KVTable; +import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo; import org.apache.rocketmq.common.protocol.route.BrokerData; +import org.apache.rocketmq.common.protocol.route.SnodeData; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; @@ -96,20 +98,8 @@ public class ClusterListSubCommand implements SubCommand { } } - private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, - RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { - - ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); - - System.out.printf("%-16s %-32s %14s %14s %14s %14s%n", - "#Cluster Name", - "#Broker Name", - "#InTotalYest", - "#OutTotalYest", - "#InTotalToday", - "#OutTotalToday" - ); - + private void printBrokerMoreStats(final DefaultMQAdminExt defaultMQAdminExt, + ClusterInfo clusterInfoSerializeWrapper) { Iterator>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); while (itCluster.hasNext()) { Map.Entry> next = itCluster.next(); @@ -165,25 +155,58 @@ public class ClusterListSubCommand implements SubCommand { } } - private void printClusterBaseInfo( - final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, - RemotingSendRequestException, InterruptedException, MQBrokerException { + private void printSnodeMoreStats(final DefaultMQAdminExt defaultMQAdminExt, SnodeClusterInfo snodeClusterInfo) { + Iterator>> itCluster = snodeClusterInfo.getSnodeCluster().entrySet().iterator(); + while (itCluster.hasNext()) { + Map.Entry> next = itCluster.next(); + String clusterName = next.getKey(); + TreeSet snodeNameSet = new TreeSet<>(); + snodeNameSet.addAll(next.getValue()); + + for (String snodeeName : snodeNameSet) { + SnodeData snodeData = snodeClusterInfo.getSnodeTable().get(snodeeName); + if (snodeData != null) { + + String address = snodeData.getAddress(); + System.out.printf("%-16s %-32s %14d %14d %14d %14d%n", + clusterName, + snodeeName, + 0, + 0, + 0, + 0 + ); + } + } + + if (itCluster.hasNext()) { + System.out.printf(""); + } + } + } + + private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, + RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException { ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); - System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", + SnodeClusterInfo snodeClusterInfo = defaultMQAdminExt.examineSnodeClusterInfo(); + + System.out.printf("%-16s %-32s %14s %14s %14s %14s%n", "#Cluster Name", "#Broker Name", - "#BID", - "#Addr", - "#Version", - "#InTPS(LOAD)", - "#OutTPS(LOAD)", - "#PCWait(ms)", - "#Hour", - "#SPACE" + "#InTotalYest", + "#OutTotalYest", + "#InTotalToday", + "#OutTotalToday" ); + printBrokerMoreStats(defaultMQAdminExt, clusterInfoSerializeWrapper); + printSnodeMoreStats(defaultMQAdminExt, snodeClusterInfo); + } + + private void printBrokerClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt, + ClusterInfo clusterInfoSerializeWrapper) { Iterator>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator(); while (itCluster.hasNext()) { Map.Entry> next = itCluster.next(); @@ -275,4 +298,60 @@ public class ClusterListSubCommand implements SubCommand { } } } + + private void printSnodeClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt, + SnodeClusterInfo snodeClusterInfo) { + Iterator>> itCluster = snodeClusterInfo.getSnodeCluster().entrySet().iterator(); + while (itCluster.hasNext()) { + Map.Entry> next = itCluster.next(); + String clusterName = next.getKey(); + TreeSet snodeNameSet = new TreeSet<>(); + snodeNameSet.addAll(next.getValue()); + + for (String snodeeName : snodeNameSet) { + SnodeData snodeData = snodeClusterInfo.getSnodeTable().get(snodeeName); + if (snodeData != null) { + String address = snodeData.getAddress(); + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", + clusterName, + snodeeName, + 0, + address, + 0, + 0, + 0, + 0, + 0 + ); + } + } + + if (itCluster.hasNext()) { + System.out.printf(""); + } + } + } + + private void printClusterBaseInfo( + final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException, + RemotingSendRequestException, InterruptedException, MQBrokerException { + + ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo(); + SnodeClusterInfo snodeClusterInfo = defaultMQAdminExt.examineSnodeClusterInfo(); + + System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n", + "#Cluster Name", + "#Broker Name", + "#BID", + "#Addr", + "#Version", + "#InTPS(LOAD)", + "#OutTPS(LOAD)", + "#PCWait(ms)", + "#Hour", + "#SPACE" + ); + printBrokerClusterBaseInfo(defaultMQAdminExt, clusterInfoSerializeWrapper); + + } }