提交 673fead5 编写于 作者: S ShannonDing

Merge branch 'snode' of github.com:apache/rocketmq into snode

......@@ -130,8 +130,8 @@ public class PlainPermissionLoader {
if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
......
......@@ -469,8 +469,6 @@ public class BrokerController {
}
}
initialTransaction();
initialAcl();
// initialRpcHooks();
}
return result;
}
......@@ -528,16 +526,16 @@ public class BrokerController {
// }
}
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook : rpcHooks) {
this.remotingServer.registerServerRPCHook(rpcHook);
}
}
// private void initialRpcHooks() {
//
// List<RPCHook> rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
// if (rpcHooks == null || rpcHooks.isEmpty()) {
// return;
// }
// for (RPCHook rpcHook : rpcHooks) {
// this.remotingServer.registerServerRPCHook(rpcHook);
// }
// }
// registerInterceoptorGroup()
......
......@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
......@@ -185,6 +186,8 @@ public class DefaultMQConsumerWithTraceTest {
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish();
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......@@ -195,7 +198,7 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.shutdown();
}
// @Test
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
......
......@@ -116,6 +116,7 @@ public class DefaultMQProducerWithTraceTest {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
}
......
......@@ -18,7 +18,7 @@ package org.apache.rocketmq.common;
public class MQVersion {
public static final int CURRENT_VERSION = Version.V4_4_0.ordinal();
public static final int CURRENT_VERSION = Version.V4_4_1.ordinal();
public static String getVersionDesc(int value) {
int length = Version.values().length;
......
......@@ -92,6 +92,10 @@ public class SnodeConfig {
private int listenPort = 11911;
private int metricsExportPort = 1234;
private boolean metricsEnable = true;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
private boolean enablePropertyFilter = true;
......@@ -221,7 +225,7 @@ public class SnodeConfig {
}
public void setSnodeHandleMqttThreadPoolQueueCapacity(
int snodeHandleMqttThreadPoolQueueCapacity) {
int snodeHandleMqttThreadPoolQueueCapacity) {
this.snodeHandleMqttThreadPoolQueueCapacity = snodeHandleMqttThreadPoolQueueCapacity;
}
......@@ -380,4 +384,20 @@ public class SnodeConfig {
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
public int getMetricsExportPort() {
return metricsExportPort;
}
public void setMetricsExportPort(int metricsExportPort) {
this.metricsExportPort = metricsExportPort;
}
public boolean isMetricsEnable() {
return metricsEnable;
}
public void setMetricsEnable(boolean metricsEnable) {
this.metricsEnable = metricsEnable;
}
}
......@@ -22,6 +22,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.utils.CallSnapshot;
import org.apache.rocketmq.logging.InternalLogger;
public class StatsItem {
......@@ -227,28 +228,3 @@ public class StatsItem {
}
}
class CallSnapshot {
private final long timestamp;
private final long times;
private final long value;
public CallSnapshot(long timestamp, long times, long value) {
super();
this.timestamp = timestamp;
this.times = times;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public long getTimes() {
return times;
}
public long getValue() {
return value;
}
}
......@@ -162,14 +162,11 @@ public class StatsItemSet {
StatsItem statsItem = this.statsItemTable.get(statsKey);
if (null == statsItem) {
statsItem = new StatsItem(this.statsName, statsKey, this.scheduledExecutorService, this.log);
StatsItem prev = this.statsItemTable.put(statsKey, statsItem);
if (null == prev) {
// statsItem.init();
StatsItem prev = this.statsItemTable.putIfAbsent(statsKey, statsItem);
if (null != prev) {
statsItem = prev;
}
}
return statsItem;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.utils;
public class CallSnapshot {
private final long timestamp;
private final long times;
private final long value;
public CallSnapshot(long timestamp, long times, long value) {
super();
this.timestamp = timestamp;
this.times = times;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public long getTimes() {
return times;
}
public long getValue() {
return value;
}
}
......@@ -58,5 +58,9 @@
<artifactId>rocketmq-acl</artifactId>
<version>4.4.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
</dependencies>
</project>
......@@ -61,6 +61,8 @@ public class Consumer {
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.setNamesrvAddr("47.102.149.193:9876");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
......
......@@ -47,6 +47,7 @@ public class Producer {
/*
* Launch the instance.
*/
// producer.setNamesrvAddr("47.102.149.193:9876");
producer.start();
for (int i = 0; i < 10; i++) {
......
......@@ -633,6 +633,21 @@
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
<version>0.6.0</version>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
<version>0.6.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
......@@ -82,13 +82,13 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
public RemotingClient init(ClientConfig clientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = clientConfig;
this.channelEventListener = channelEventListener;
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyClientEpollIoThreads",
clientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
10000, "Http2Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
ThreadUtils.newGenericThreadFactory("Http2NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildHttp2SslClientContext();
return this;
}
......
......@@ -97,23 +97,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
this.channelEventListener = channelEventListener;
this.publicExecutor = ThreadUtils.newFixedThreadPool(
serverConfig.getServerCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
10000, "Http2Remoting-PublicExecutor", true);
if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) {
this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
ThreadUtils.newGenericThreadFactory("Http2NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort();
buildHttp2SslServerContext();
return this;
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......@@ -80,6 +80,18 @@
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_httpserver</artifactId>
</dependency>
<dependency>
<groupId>io.prometheus</groupId>
<artifactId>simpleclient_hotspot</artifactId>
</dependency>
</dependencies>
<build>
......
......@@ -74,11 +74,13 @@ import org.apache.rocketmq.snode.processor.mqtthandler.MqttSubscribeMessageHandl
import org.apache.rocketmq.snode.processor.mqtthandler.MqttUnsubscribeMessagHandler;
import org.apache.rocketmq.snode.service.ClientService;
import org.apache.rocketmq.snode.service.EnodeService;
import org.apache.rocketmq.snode.service.MetricsService;
import org.apache.rocketmq.snode.service.NnodeService;
import org.apache.rocketmq.snode.service.PushService;
import org.apache.rocketmq.snode.service.ScheduledService;
import org.apache.rocketmq.snode.service.impl.ClientServiceImpl;
import org.apache.rocketmq.snode.service.impl.EnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.apache.rocketmq.snode.service.impl.NnodeServiceImpl;
import org.apache.rocketmq.snode.service.impl.PushServiceImpl;
import org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl;
......@@ -121,6 +123,7 @@ public class SnodeController {
private PushService pushService;
private ClientService clientService;
private SlowConsumerService slowConsumerService;
private MetricsService metricsService;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
......@@ -217,6 +220,7 @@ public class SnodeController {
this.clientHousekeepingService = new ClientHousekeepingService(this.producerManager,
this.consumerManager, this.iotClientManager);
this.slowConsumerService = new SlowConsumerServiceImpl(this);
this.metricsService = new MetricsServiceImpl();
}
public SnodeConfig getSnodeConfig() {
......@@ -384,6 +388,7 @@ public class SnodeController {
this.mqttRemotingClient.start();
this.scheduledService.startScheduleTask();
this.clientHousekeepingService.start(this.snodeConfig.getHouseKeepingInterval());
this.metricsService.start(this.snodeConfig.getMetricsExportPort());
}
public void shutdown() {
......@@ -399,9 +404,6 @@ public class SnodeController {
if (this.heartbeatExecutor != null) {
this.heartbeatExecutor.shutdown();
}
// if (this.consumerManagerExecutor != null) {
// this.consumerManagerExecutor.shutdown();
// }
if (this.scheduledExecutorService != null) {
this.scheduledExecutorService.shutdown();
}
......@@ -423,6 +425,9 @@ public class SnodeController {
if (this.pushService != null) {
this.pushService.shutdown();
}
if (this.metricsService != null) {
this.metricsService.shutdown();
}
}
public RemotingServer getSnodeServer() {
......@@ -553,4 +558,12 @@ public class SnodeController {
public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
this.consumerOffsetManager = consumerOffsetManager;
}
public MetricsService getMetricsService() {
return metricsService;
}
public void setMetricsService(MetricsService metricsService) {
this.metricsService = metricsService;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.mqtrace;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class MsgTraceServiceImpl implements Interceptor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@Override
public String interceptorName() {
return "snodeMsgTraceInterceptor";
}
@Override
public void beforeRequest(RequestContext requestContext) {
}
@Override
public void afterRequest(ResponseContext responseContext) {
}
@Override
public void onException(ExceptionContext exceptionContext) {
}
}
......@@ -121,23 +121,10 @@ public class ConsumerOffsetManager {
return -1;
}
// public String encode(final boolean prettyFormat) {
// return RemotingSerializable.toJson(this, prettyFormat);
// }
public ConcurrentMap<String, ConcurrentMap<Integer, Long>> getOffsetTable() {
return offsetTable;
}
// public void setOffsetTable(ConcurrentHashMap<String, ConcurrentMap<Integer, Long>> offsetTable) {
// this.offsetTable = offsetTable;
// }
// public Map<Integer, Long> queryOffset(final String enodeName, final String group, final String topic) {
// // topic@group
// String key = buildKey(enodeName, topic, group);
// return this.offsetTable.get(key);
// }
public void commitOffset(final String enodeName, final String clientHost, final String group, final String topic,
final int queueId,
......
......@@ -17,6 +17,7 @@
package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
......@@ -27,11 +28,12 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.snode.SnodeController;
import org.apache.rocketmq.snode.service.MetricsService;
public class SendMessageProcessor implements RequestProcessor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
......@@ -45,25 +47,41 @@ public class SendMessageProcessor implements RequestProcessor {
@Override
public RemotingCommand processRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
this.snodeController.getMetricsService().incRequestCount(request.getCode(), true);
try {
processSendMessageRequest(remotingChannel, request);
} catch (Exception ex) {
this.snodeController.getMetricsService().incRequestCount(request.getCode(), false);
throw ex;
}
return null;
}
private void processSendMessageRequest(RemotingChannel remotingChannel,
RemotingCommand request) throws RemotingCommandException {
MetricsService.Timer timer = this.snodeController.getMetricsService().startTimer(request.getCode());
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
RequestContext requestContext = new RequestContext(request, remotingChannel);
this.snodeController.getSendMessageInterceptorGroup().beforeRequest(requestContext);
}
String enodeName;
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
final StringBuffer stringBuffer = new StringBuffer();
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = null;
boolean isSendBack = false;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
if (request.getCode() == RequestCode.SEND_MESSAGE_V2 ||
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
stringBuffer.append(sendMessageRequestHeaderV2.getB());
} else {
isSendBack = true;
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup()));
}
CompletableFuture<RemotingCommand> responseFuture = snodeController.getEnodeService().sendMessage(enodeName, request);
final String topic = sendMessageRequestHeaderV2.getB();
final Integer queueId = sendMessageRequestHeaderV2.getE();
final byte[] message = request.getBody();
final boolean isNeedPush = !isSendBack;
......@@ -74,18 +92,20 @@ public class SendMessageProcessor implements RequestProcessor {
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
}
remotingChannel.reply(data);
this.snodeController.getMetricsService().recordRequestSize(stringBuffer.toString(), request.getBody().length);
if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
this.snodeController.getPushService().pushMessage(enodeName, topic, queueId, message, data);
this.snodeController.getPushService().pushMessage(enodeName, stringBuffer.toString(), queueId, message, data);
}
} else {
this.snodeController.getMetricsService().incRequestCount(request.getCode(), false);
if (this.snodeController.getSendMessageInterceptorGroup() != null) {
ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, ex, null);
this.snodeController.getSendMessageInterceptorGroup().onException(exceptionContext);
}
log.error("Send Message error: {}", ex);
}
timer.observeDuration();
});
return null;
}
@Override
......
......@@ -59,7 +59,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel);
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true;
}
......
......@@ -18,4 +18,21 @@ package org.apache.rocketmq.snode.service;
public interface MetricsService {
interface Timer {
Timer startTimer(int requestCode);
void observeDuration();
}
void incRequestCount(int requestCode, boolean success);
void recordRequestSize(String topic, double size);
Timer startTimer(int requestCode);
void recordRequestLatency(Timer timer);
void start(int port);
void shutdown();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service.impl;
import io.prometheus.client.Counter;
import io.prometheus.client.Summary;
import io.prometheus.client.exporter.HTTPServer;
import io.prometheus.client.hotspot.DefaultExports;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.snode.exception.SnodeException;
import org.apache.rocketmq.snode.service.MetricsService;
public class MetricsServiceImpl implements MetricsService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_STATS_LOGGER_NAME);
public MetricsServiceImpl() {
}
private HTTPServer server;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "SNodeStatsScheduledThread");
}
});
private StatsItemSet statsItemSet = new StatsItemSet("SnodeStats", scheduledExecutorService, log);
private final static Counter requestTotal = Counter.build().name("request_total").help("request total count").labelNames("requestCode").register();
private final static Counter requestFailedTotal = Counter.build().name("request_failed_total").help("request total count").labelNames("requestCode").register();
private final static Summary requestLatency = Summary.build()
.quantile(0.5, 0.05)
.quantile(0.9, 0.01)
.quantile(0.99, 0.001)
.name("requests_latency_seconds").labelNames("requestCode").help("Request latency in seconds.").register();
private final static Summary receivedBytes = Summary.build()
.quantile(0.5, 0.05)
.quantile(0.9, 0.01)
.quantile(0.99, 0.001)
.labelNames("topic")
.name("sent_topic_size_bytes").help("Request size in bytes.").register();
@Override
synchronized public void incRequestCount(int requestCode, boolean success) {
if (!success) {
this.requestFailedTotal.labels(requestCode + "").inc();
this.statsItemSet.addValue("TotalFailed@" + requestCode, 1, 1);
} else {
this.requestTotal.labels(requestCode + "").inc();
this.statsItemSet.addValue("Total@" + requestCode, 1, 1);
}
}
@Override
synchronized public void recordRequestSize(String topic, double size) {
this.receivedBytes.labels(topic).observe(size);
this.statsItemSet.addValue("TotalSize@" + topic, new Double(size).intValue(), 1);
}
@Override
public Timer startTimer(int requestCode) {
return new PrometheusTimer(this.requestLatency).startTimer(requestCode);
}
@Override
public void recordRequestLatency(Timer timer) {
timer.observeDuration();
}
@Override public void start(int port) {
try {
DefaultExports.initialize();
server = new HTTPServer(port);
} catch (Exception ex) {
log.error("Start metrics http server failed!", ex);
throw new SnodeException(ResponseCode.SYSTEM_ERROR, "Start metrics http server failed!");
}
}
@Override public void shutdown() {
this.server.stop();
}
class PrometheusTimer implements Timer {
private Summary.Timer timer;
private Summary summary;
public PrometheusTimer(Summary summary) {
this.summary = summary;
}
@Override
public Timer startTimer(int requestCode) {
this.timer = summary.labels(requestCode + "").startTimer();
return this;
}
@Override
public void observeDuration() {
if (this.timer != null) {
this.timer.observeDuration();
}
}
}
public StatsItemSet getStatsItemSet() {
return statsItemSet;
}
public void setStatsItemSet(StatsItemSet statsItemSet) {
this.statsItemSet = statsItemSet;
}
public Counter getRequestTotal() {
return requestTotal;
}
public Counter getRequestFailedTotal() {
return requestFailedTotal;
}
public Summary getRequestLatency() {
return requestLatency;
}
public Summary getReceivedBytes() {
return receivedBytes;
}
}
......@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.RegisterSnodeRequestHeader;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
......@@ -207,7 +208,10 @@ public class NnodeServiceImpl implements NnodeService {
clusterInfo = this.updateEnodeClusterInfo();
}
if (this.clusterInfo != null) {
return this.clusterInfo.getBrokerAddrTable().get(enodeName).getBrokerAddrs().get(MixAll.MASTER_ID);
BrokerData brokerData = this.clusterInfo.getBrokerAddrTable().get(enodeName);
if (brokerData != null) {
return brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
}
}
return null;
}
......
......@@ -71,12 +71,18 @@ public class SendMessageProcessorTest {
}
@Test
public void testProcessRequest() throws RemotingCommandException {
public void testSendMessageV2ProcessRequest() throws RemotingCommandException {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageV2Command();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
@Test
public void testSendBatchMessageProcessRequest() throws RemotingCommandException {
snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageCommand();
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2);
RemotingCommand request = createSendBatchMesssageCommand();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
......@@ -95,7 +101,7 @@ public class SendMessageProcessorTest {
return requestHeader;
}
private RemotingCommand createSendMesssageCommand() {
private RemotingCommand createSendMesssageV2Command() {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, sendMessageRequestHeaderV2);
request.setBody(new byte[] {'a'});
......@@ -103,6 +109,14 @@ public class SendMessageProcessorTest {
return request;
}
private RemotingCommand createSendBatchMesssageCommand() {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, sendMessageRequestHeaderV2);
request.setBody(new byte[] {'b'});
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
RemotingCommand createSendMessageResponse(int responseCode) {
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.service;
import io.prometheus.client.CollectorRegistry;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.stats.StatsItemSet;
import org.apache.rocketmq.snode.service.impl.MetricsServiceImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(MockitoJUnitRunner.class)
public class MetricsServiceImplTest {
private MetricsServiceImpl metricsService = new MetricsServiceImpl();
private final String requestCode = "310";
private final String topic = "Test";
@Test
public void testIncRequestCountFalse() throws Exception {
metricsService.incRequestCount(310, false);
Double requestFailedTotal = getLabelsValue("request_failed_total", requestCode);
Double requestTotal = getLabelsValue("request_total", requestCode);
assertThat(requestFailedTotal).isEqualTo(1.0);
assertThat(requestTotal);
}
@Test
public void testIncRequestCountTrue() {
metricsService.incRequestCount(310, true);
Double requestTotal = getLabelsValue("request_total", requestCode);
assertThat(requestTotal).isEqualTo(1.0);
}
@Test
public void testRequestSize() throws Exception {
Field field = MetricsServiceImpl.class.getDeclaredField("statsItemSet");
field.setAccessible(true);
metricsService.recordRequestSize(topic, 100);
StatsItemSet statsItemSet = (StatsItemSet) field.get(metricsService);
AtomicLong requestSize = statsItemSet.getStatsItem("TotalSize@" + topic).getValue();
assertThat(requestSize.intValue()).isEqualTo(100);
}
public Double getLabelsValue(String name, String labelValue) {
return CollectorRegistry.defaultRegistry.getSampleValue(name, new String[] {"requestCode"}, new String[] {labelValue});
}
}
......@@ -43,6 +43,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
......@@ -222,6 +223,11 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt {
return defaultMQAdminExtImpl.examineBrokerClusterInfo();
}
@Override
public SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
return defaultMQAdminExtImpl.examineSnodeClusterInfo();
}
@Override
public TopicRouteData examineTopicRouteInfo(
String topic) throws RemotingException, MQClientException, InterruptedException {
......
......@@ -47,6 +47,7 @@ import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicOffset;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.help.FAQUrl;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageClientExt;
import org.apache.rocketmq.common.message.MessageConst;
......@@ -275,6 +276,12 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
return this.mqClientInstance.getMQClientAPIImpl().getBrokerClusterInfo(timeoutMillis);
}
@Override
public SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException {
return this.mqClientInstance.getMQClientAPIImpl().getSnodeClusterInfo(timeoutMillis);
}
@Override
public TopicRouteData examineTopicRouteInfo(
String topic) throws RemotingException, MQClientException, InterruptedException {
......
......@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueryConsumeQueueResponseBody;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
......@@ -100,6 +101,9 @@ public interface MQAdminExt extends MQAdmin {
ClusterInfo examineBrokerClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException;
SnodeClusterInfo examineSnodeClusterInfo() throws InterruptedException, MQBrokerException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException;
TopicRouteData examineTopicRouteInfo(
final String topic) throws RemotingException, MQClientException, InterruptedException;
......
......@@ -26,7 +26,9 @@ import org.apache.commons.cli.Options;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.SnodeClusterInfo;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.SnodeData;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
......@@ -96,20 +98,8 @@ public class ClusterListSubCommand implements SubCommand {
}
}
private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
System.out.printf("%-16s %-32s %14s %14s %14s %14s%n",
"#Cluster Name",
"#Broker Name",
"#InTotalYest",
"#OutTotalYest",
"#InTotalToday",
"#OutTotalToday"
);
private void printBrokerMoreStats(final DefaultMQAdminExt defaultMQAdminExt,
ClusterInfo clusterInfoSerializeWrapper) {
Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
while (itCluster.hasNext()) {
Map.Entry<String, Set<String>> next = itCluster.next();
......@@ -165,25 +155,58 @@ public class ClusterListSubCommand implements SubCommand {
}
}
private void printClusterBaseInfo(
final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
RemotingSendRequestException, InterruptedException, MQBrokerException {
private void printSnodeMoreStats(final DefaultMQAdminExt defaultMQAdminExt, SnodeClusterInfo snodeClusterInfo) {
Iterator<Map.Entry<String, Set<String>>> itCluster = snodeClusterInfo.getSnodeCluster().entrySet().iterator();
while (itCluster.hasNext()) {
Map.Entry<String, Set<String>> next = itCluster.next();
String clusterName = next.getKey();
TreeSet<String> snodeNameSet = new TreeSet<>();
snodeNameSet.addAll(next.getValue());
for (String snodeeName : snodeNameSet) {
SnodeData snodeData = snodeClusterInfo.getSnodeTable().get(snodeeName);
if (snodeData != null) {
String address = snodeData.getAddress();
System.out.printf("%-16s %-32s %14d %14d %14d %14d%n",
clusterName,
snodeeName,
0,
0,
0,
0
);
}
}
if (itCluster.hasNext()) {
System.out.printf("");
}
}
}
private void printClusterMoreStats(final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException,
RemotingTimeoutException, RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
SnodeClusterInfo snodeClusterInfo = defaultMQAdminExt.examineSnodeClusterInfo();
System.out.printf("%-16s %-32s %14s %14s %14s %14s%n",
"#Cluster Name",
"#Broker Name",
"#BID",
"#Addr",
"#Version",
"#InTPS(LOAD)",
"#OutTPS(LOAD)",
"#PCWait(ms)",
"#Hour",
"#SPACE"
"#InTotalYest",
"#OutTotalYest",
"#InTotalToday",
"#OutTotalToday"
);
printBrokerMoreStats(defaultMQAdminExt, clusterInfoSerializeWrapper);
printSnodeMoreStats(defaultMQAdminExt, snodeClusterInfo);
}
private void printBrokerClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt,
ClusterInfo clusterInfoSerializeWrapper) {
Iterator<Map.Entry<String, Set<String>>> itCluster = clusterInfoSerializeWrapper.getClusterAddrTable().entrySet().iterator();
while (itCluster.hasNext()) {
Map.Entry<String, Set<String>> next = itCluster.next();
......@@ -275,4 +298,60 @@ public class ClusterListSubCommand implements SubCommand {
}
}
}
private void printSnodeClusterBaseInfo(final DefaultMQAdminExt defaultMQAdminExt,
SnodeClusterInfo snodeClusterInfo) {
Iterator<Map.Entry<String, Set<String>>> itCluster = snodeClusterInfo.getSnodeCluster().entrySet().iterator();
while (itCluster.hasNext()) {
Map.Entry<String, Set<String>> next = itCluster.next();
String clusterName = next.getKey();
TreeSet<String> snodeNameSet = new TreeSet<>();
snodeNameSet.addAll(next.getValue());
for (String snodeeName : snodeNameSet) {
SnodeData snodeData = snodeClusterInfo.getSnodeTable().get(snodeeName);
if (snodeData != null) {
String address = snodeData.getAddress();
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
clusterName,
snodeeName,
0,
address,
0,
0,
0,
0,
0
);
}
}
if (itCluster.hasNext()) {
System.out.printf("");
}
}
}
private void printClusterBaseInfo(
final DefaultMQAdminExt defaultMQAdminExt) throws RemotingConnectException, RemotingTimeoutException,
RemotingSendRequestException, InterruptedException, MQBrokerException {
ClusterInfo clusterInfoSerializeWrapper = defaultMQAdminExt.examineBrokerClusterInfo();
SnodeClusterInfo snodeClusterInfo = defaultMQAdminExt.examineSnodeClusterInfo();
System.out.printf("%-16s %-22s %-4s %-22s %-16s %19s %19s %10s %5s %6s%n",
"#Cluster Name",
"#Broker Name",
"#BID",
"#Addr",
"#Version",
"#InTPS(LOAD)",
"#OutTPS(LOAD)",
"#PCWait(ms)",
"#Hour",
"#SPACE"
);
printBrokerClusterBaseInfo(defaultMQAdminExt, clusterInfoSerializeWrapper);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册