diff --git a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java index 1da7380b6d9f53b8b0c0909e50093a806ac5676a..9148422ff101a6e4ed42e141fb9fbf3d2f77a0cf 100644 --- a/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java +++ b/acl/src/main/java/org/apache/rocketmq/acl/plain/PlainPermissionLoader.java @@ -130,8 +130,8 @@ public class PlainPermissionLoader { if (!ownedPermMap.containsKey(resource)) { // Check the default perm - byte ownedPerm = isGroup ? needCheckedAccess.getDefaultGroupPerm() : - needCheckedAccess.getDefaultTopicPerm(); + byte ownedPerm = isGroup ? ownedAccess.getDefaultGroupPerm() : + ownedAccess.getDefaultTopicPerm(); if (!Permission.checkPermission(neededPerm, ownedPerm)) { throw new AclException(String.format("No default permission for %s", PlainAccessResource.printStr(resource, isGroup))); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index 762c1921ab1c71d0f95047a2ec1e418842e41f43..5c2de641d70cb952a37446b32525b9fca7a3e158 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -32,7 +32,6 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.rocketmq.acl.AccessValidator; import org.apache.rocketmq.broker.client.ClientHousekeepingService; import org.apache.rocketmq.broker.client.ConsumerIdsChangeListener; import org.apache.rocketmq.broker.client.ConsumerManager; @@ -469,8 +468,6 @@ public class BrokerController { } } initialTransaction(); - initialAcl(); -// initialRpcHooks(); } return result; } @@ -490,57 +487,6 @@ public class BrokerController { this.transactionalMessageCheckService = new TransactionalMessageCheckService(this); } - private void initialAcl() { - if (!this.brokerConfig.isAclEnable()) { - log.info("The broker dose not enable acl"); - return; - } - - List accessValidators = ServiceProvider.loadServiceList(ServiceProvider.ACL_VALIDATOR_ID, AccessValidator.class); - if (accessValidators == null || accessValidators.isEmpty()) { - log.info("The broker dose not load the AccessValidator"); - return; - } - -// for (AccessValidator accessValidator : accessValidators) { -// final AccessValidator validator = accessValidator; -// this.registerServerRPCHook(new Interceptor() { -// -// @Override public String interceptorName() { -// return "aclInterceptor"; -// } -// -// @Override public void beforeRequest(RequestContext requestContext) { -// -// validator.validate(validator.parse(requestContext.getRequest(), requestContext.getRemotingChannel().remoteAddress().toString())); -// -// } -// -// @Override public void afterRequest(ResponseContext responseContext) { -// -// } -// -// @Override public void onException(ExceptionContext exceptionContext) { -// -// } -// -// }); -// } - } - - private void initialRpcHooks() { - - List rpcHooks = ServiceProvider.loadServiceList(ServiceProvider.RPC_HOOK_ID, RPCHook.class); - if (rpcHooks == null || rpcHooks.isEmpty()) { - return; - } - for (RPCHook rpcHook : rpcHooks) { - this.remotingServer.registerServerRPCHook(rpcHook); - } - } - -// registerInterceoptorGroup() - public void registerProcessor() { /** * SendMessageProcessor diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java index aedeff20aa3d1210ab91d7109eb7f28b2c4b0c3a..9665fc34a90f4933683ebb4d7215384a7f630e82 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQConsumerWithTraceTest.java @@ -65,6 +65,7 @@ import org.apache.rocketmq.common.protocol.route.TopicRouteData; import org.apache.rocketmq.remoting.exception.RemotingException; import org.junit.After; import org.junit.Before; +import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; @@ -185,6 +186,8 @@ public class DefaultMQConsumerWithTraceTest { }); doReturn(new FindBrokerResult("127.0.0.1:10911", false)).when(mQClientFactory).findBrokerAddressInSubscribe(anyString(), anyLong(), anyBoolean()); + doReturn("127.0.0.1:10911").when(mQClientFactory).findSnodeAddressInPublish(); + Set messageQueueSet = new HashSet(); messageQueueSet.add(createPullRequest().getMessageQueue()); pushConsumer.getDefaultMQPushConsumerImpl().updateTopicSubscribeInfo(topic, messageQueueSet); @@ -195,7 +198,7 @@ public class DefaultMQConsumerWithTraceTest { pushConsumer.shutdown(); } -// @Test + @Test public void testPullMessage_WithTrace_Success() throws InterruptedException, RemotingException, MQBrokerException, MQClientException { traceProducer.getDefaultMQProducerImpl().getmQClientFactory().registerProducer(producerGroupTraceTemp, traceProducer.getDefaultMQProducerImpl()); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index 903be01cddbf13031368db73daa9df8ab6198ebb..a2a7c758d83262b916f4db5c8e8a035980cef7b5 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -116,6 +116,7 @@ public class DefaultMQProducerWithTraceTest { when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); + when(mQClientFactory.findSnodeAddressInPublish()).thenReturn("127.0.0.1:10911"); } diff --git a/snode/pom.xml b/snode/pom.xml index 874b52653dd45ec707548969d14d57db0faaf59a..c19dded4dd51bfabfb24be4e47c1b3f7f9e10d21 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -19,7 +19,7 @@ org.apache.rocketmq rocketmq-all - 4.4.0-SNAPSHOT + 4.4.1-SNAPSHOT 4.0.0 diff --git a/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..a61f9b3a9823def0c4a45a12018bbd90d8edfc80 --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/mqtrace/MsgTraceServiceImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.snode.mqtrace; + +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.interceptor.ResponseContext; + +public class MsgTraceServiceImpl implements Interceptor { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.SNODE_LOGGER_NAME); + + @Override + public String interceptorName() { + return "snodeMsgTraceInterceptor"; + } + + @Override + public void beforeRequest(RequestContext requestContext) { + } + + @Override + public void afterRequest(ResponseContext responseContext) { + } + + @Override + public void onException(ExceptionContext exceptionContext) { + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java index c1b1633cabf5ea80e45e624f1f0d2f7d860a9fb7..fb8691e7192b38d53253a4cf12b66b4d7eef5c76 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttConnectMessageHandler.java @@ -59,7 +59,7 @@ public class MqttConnectMessageHandler implements MessageHandler { private boolean isConnected(RemotingChannel remotingChannel, String clientId) { ClientManager iotClientManager = snodeController.getIotClientManager(); - Client client = iotClientManager.getClient(IOTClientManagerImpl.IOTGROUP, remotingChannel); + Client client = iotClientManager.getClient(IOTClientManagerImpl.IOT_GROUP, remotingChannel); if (client != null && client.getClientId().equals(clientId) && client.isConnected()) { return true; } diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..847d190cbe06fd24cc5c3779be5d57793ec1e48c 100644 --- a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor +++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.SendMessageInterceptor @@ -0,0 +1 @@ +org.apache.rocketmq.snode.mqtrace.MsgTraceServiceImpl \ No newline at end of file