提交 9297d560 编写于 作者: D duhenglucky

Add local metrics sample and compute process

......@@ -130,8 +130,8 @@ public class PlainPermissionLoader {
if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
......
......@@ -469,8 +469,6 @@ public class BrokerController {
}
}
initialTransaction();
initialAcl();
// initialRpcHooks();
}
return result;
}
......
......@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
......@@ -185,6 +186,8 @@ public class DefaultMQConsumerWithTraceTest {
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish();
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......@@ -195,7 +198,7 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.shutdown();
}
// @Test
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
......
......@@ -116,6 +116,7 @@ public class DefaultMQProducerWithTraceTest {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
}
......
......@@ -82,13 +82,13 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
public RemotingClient init(ClientConfig clientConfig, ChannelEventListener channelEventListener) {
this.nettyClientConfig = clientConfig;
this.channelEventListener = channelEventListener;
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("NettyClientEpollIoThreads",
this.ioGroup = new NioEventLoopGroup(clientConfig.getClientWorkerThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyClientEpollIoThreads",
clientConfig.getClientWorkerThreads()));
this.publicExecutor = ThreadUtils.newFixedThreadPool(
clientConfig.getClientCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
10000, "Http2Remoting-PublicExecutor", true);
this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(clientConfig.getClientWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
ThreadUtils.newGenericThreadFactory("Http2NettyClientWorkerThreads", clientConfig.getClientWorkerThreads()));
buildHttp2SslClientContext();
return this;
}
......
......@@ -97,23 +97,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
this.channelEventListener = channelEventListener;
this.publicExecutor = ThreadUtils.newFixedThreadPool(
serverConfig.getServerCallbackExecutorThreads(),
10000, "Remoting-PublicExecutor", true);
10000, "Http2Remoting-PublicExecutor", true);
if (JvmUtils.isLinux() && this.serverConfig.isUseEpollNativeSelector()) {
this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyEpollIoThreads",
this.ioGroup = new EpollEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyEpollIoThreads",
serverConfig.getServerSelectorThreads()));
this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
this.bossGroup = new EpollEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.socketChannelClass = EpollServerSocketChannel.class;
} else {
this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("NettyBossThreads",
this.bossGroup = new NioEventLoopGroup(serverConfig.getServerAcceptorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyBossThreads",
serverConfig.getServerAcceptorThreads()));
this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("NettyNioIoThreads",
this.ioGroup = new NioEventLoopGroup(serverConfig.getServerSelectorThreads(), ThreadUtils.newGenericThreadFactory("Http2NettyNioIoThreads",
serverConfig.getServerSelectorThreads()));
this.socketChannelClass = NioServerSocketChannel.class;
}
this.workerGroup = new DefaultEventExecutorGroup(serverConfig.getServerWorkerThreads(),
ThreadUtils.newGenericThreadFactory("NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
ThreadUtils.newGenericThreadFactory("Http2NettyWorkerThreads", serverConfig.getServerWorkerThreads()));
this.port = nettyServerConfig.getListenPort();
buildHttp2SslServerContext();
return this;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.mqtrace;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class MsgTraceServiceImpl implements Interceptor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@Override
public String interceptorName() {
return "snodeMsgTraceInterceptor";
}
@Override
public void beforeRequest(RequestContext requestContext) {
}
@Override
public void afterRequest(ResponseContext responseContext) {
}
@Override
public void onException(ExceptionContext exceptionContext) {
}
}
......@@ -67,14 +67,16 @@ public class SendMessageProcessor implements RequestProcessor {
String enodeName;
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = null;
final StringBuffer stringBuffer = new StringBuffer();
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = null;
boolean isSendBack = false;
if (request.getCode() == RequestCode.SEND_MESSAGE_V2) {
if (request.getCode() == RequestCode.SEND_MESSAGE_V2 ||
request.getCode() == RequestCode.SEND_BATCH_MESSAGE) {
sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
enodeName = sendMessageRequestHeaderV2.getN();
stringBuffer.append(sendMessageRequestHeaderV2.getB());
} else {
isSendBack = true;
ConsumerSendMsgBackRequestHeader consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
consumerSendMsgBackRequestHeader = (ConsumerSendMsgBackRequestHeader) request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
enodeName = consumerSendMsgBackRequestHeader.getEnodeName();
stringBuffer.append(MixAll.getRetryTopic(consumerSendMsgBackRequestHeader.getGroup()));
}
......
......@@ -71,12 +71,18 @@ public class SendMessageProcessorTest {
}
@Test
public void testProcessRequest() throws RemotingCommandException {
public void testSendMessageV2ProcessRequest() throws RemotingCommandException {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageV2Command();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
@Test
public void testSendBatchMessageProcessRequest() throws RemotingCommandException {
snodeController.setEnodeService(enodeService);
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
RemotingCommand request = createSendMesssageCommand();
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = (SendMessageRequestHeaderV2) request.decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
System.out.println("sendMessageRequestHeaderV2: " + sendMessageRequestHeaderV2);
RemotingCommand request = createSendBatchMesssageCommand();
when(this.snodeController.getEnodeService().sendMessage(anyString(), any(RemotingCommand.class))).thenReturn(future);
sendMessageProcessor.processRequest(remotingChannel, request);
}
......@@ -95,7 +101,7 @@ public class SendMessageProcessorTest {
return requestHeader;
}
private RemotingCommand createSendMesssageCommand() {
private RemotingCommand createSendMesssageV2Command() {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, sendMessageRequestHeaderV2);
request.setBody(new byte[] {'a'});
......@@ -103,6 +109,14 @@ public class SendMessageProcessorTest {
return request;
}
private RemotingCommand createSendBatchMesssageCommand() {
SendMessageRequestHeaderV2 sendMessageRequestHeaderV2 = createSendMsgRequestHeader();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEND_BATCH_MESSAGE, sendMessageRequestHeaderV2);
request.setBody(new byte[] {'b'});
CodecHelper.makeCustomHeaderToNet(request);
return request;
}
RemotingCommand createSendMessageResponse(int responseCode) {
return RemotingCommand.createResponseCommand(ResponseCode.SUCCESS, null);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册