提交 a559753f 编写于 作者: H huzongtang

[RIP-11]fix the issue of integrating msg trace to snode and acl in 4.4.0...

[RIP-11]fix the issue of integrating msg trace to snode and acl in 4.4.0 version.And also add hook for msg trace to snode.
上级 d96b8f2a
......@@ -130,8 +130,8 @@ public class PlainPermissionLoader {
if (!ownedPermMap.containsKey(resource)) {
// Check the default perm
byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() :
needCheckedAccess.getDefaultTopicPerm();
byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() :
ownedAccess.getDefaultTopicPerm();
if (!Permission.checkPermission(neededPerm, ownedPerm)) {
throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup)));
}
......
......@@ -32,7 +32,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.acl.AccessValidator;
import org.apache.rocketmq.broker.client.ClientHousekeepingService;
import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.broker.client.ConsumerManager;
......@@ -469,8 +468,6 @@ public class BrokerController {
}
}
initialTransaction();
initialAcl();
// initialRpcHooks();
}
return result;
}
......@@ -490,57 +487,6 @@ public class BrokerController {
this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);
}
private void initialAcl() {
if (!this.brokerConfig.isAclEnable()) {
log.info("The broker dose not enable acl");
return;
}
List<AccessValidator> accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class);
if (accessValidators == null || accessValidators.isEmpty()) {
log.info("The broker dose not load the AccessValidator");
return;
}
// for (AccessValidator accessValidator : accessValidators) {
// final AccessValidator validator = accessValidator;
// this.registerServerRPCHook(new Interceptor() {
//
// @Override public String interceptorName() {
// return "aclInterceptor";
// }
//
// @Override public void beforeRequest(RequestContext requestContext) {
//
// validator.validate(validator.parse(requestContext.getRequest(), requestContext.getRemotingChannel().remoteAddress().toString()));
//
// }
//
// @Override public void afterRequest(ResponseContext responseContext) {
//
// }
//
// @Override public void onException(ExceptionContext exceptionContext) {
//
// }
//
// });
// }
}
private void initialRpcHooks() {
List<RPCHook> rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class);
if (rpcHooks == null || rpcHooks.isEmpty()) {
return;
}
for (RPCHook rpcHook : rpcHooks) {
this.remotingServer.registerServerRPCHook(rpcHook);
}
}
// registerInterceoptorGroup()
public void registerProcessor() {
/**
* SendMessageProcessor
......
......@@ -65,6 +65,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
......@@ -185,6 +186,8 @@ public class DefaultMQConsumerWithTraceTest {
});
doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean());
doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish();
Set<MessageQueue> messageQueueSet = new HashSet<MessageQueue>();
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet);
......@@ -195,7 +198,7 @@ public class DefaultMQConsumerWithTraceTest {
pushConsumer.shutdown();
}
// @Test
@Test
public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException {
traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl());
......
......@@ -116,6 +116,7 @@ public class DefaultMQProducerWithTraceTest {
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
.thenReturn(createSendResult(SendStatus.SEND_OK));
when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911");
}
......
......@@ -19,7 +19,7 @@
<parent>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.4.0-SNAPSHOT</version>
<version>4.4.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.mqtrace;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public class MsgTraceServiceImpl implements Interceptor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME);
@Override
public String interceptorName() {
return "snodeMsgTraceInterceptor";
}
@Override
public void beforeRequest(RequestContext requestContext) {
}
@Override
public void afterRequest(ResponseContext responseContext) {
}
@Override
public void onException(ExceptionContext exceptionContext) {
}
}
......@@ -59,7 +59,7 @@ public class MqttConnectMessageHandler implements MessageHandler {
private boolean isConnected(RemotingChannel remotingChannel, String clientId) {
ClientManager iotClientManager = snodeController.getIotClientManager();
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel);
Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel);
if (client != null && client.getClientId().equals(clientId) && client.isConnected()) {
return true;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册