diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java index c12ebe1d65637f027a95f6908c8ae63a29539544..12189562c49db4757e37c67c340a5199d754f379 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java @@ -999,7 +999,7 @@ public class BrokerController { } public void registerServerRPCHook(RPCHook rpcHook) { - getRemotingServer().registerRPCHook(rpcHook); +// getRemotingServer().registerRPCHook(rpcHook); } public RemotingServer getRemotingServer() { diff --git a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java index 4be411a08147f20eb0bde769ba9c6ea86cafd973..d8e96a81c3afcb27fe363fe9e88e1c133d33442d 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java @@ -72,7 +72,7 @@ public class BrokerOuterAPI { public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) { this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null); - this.remotingClient.registerRPCHook(rpcHook); +// this.remotingClient.registerRPCHook(rpcHook); } public void start() { @@ -193,6 +193,9 @@ public class BrokerOuterAPI { RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; + if (response == null){ + System.out.println("ssssssssssssss"); + } switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = @@ -393,6 +396,6 @@ public class BrokerOuterAPI { } public void registerRPCHook(RPCHook rpcHook) { - remotingClient.registerRPCHook(rpcHook); +// remotingClient.registerRPCHook(rpcHook); } } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index 5e009fd1893d9f7945efb167a4125325ec4620c3..98a626c0b9eec05ebaaabf3cbd6342a3cb816e2a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -145,6 +145,8 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -169,13 +171,13 @@ public class MQClientAPIImpl { public MQClientAPIImpl(final ClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor, - RPCHook rpcHook, final org.apache.rocketmq.client.ClientConfig clientConfig) { + InterceptorGroup interceptorGroup, final org.apache.rocketmq.client.ClientConfig clientConfig) { this.clientConfig = clientConfig; topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); this.clientRemotingProcessor = clientRemotingProcessor; - this.remotingClient.registerRPCHook(rpcHook); + this.remotingClient.registerInterceptorGroup(interceptorGroup); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); @@ -556,7 +558,7 @@ public class MQClientAPIImpl { } public PullResult pullMessage( - String addr, + String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java index 31553a67ae5fd28cfc2605ecc633f7aaf5edbeb4..e23b0c93f696750d4877206646a5f648de3c1f4a 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientManager.java @@ -50,7 +50,7 @@ public class MQClientManager { if (null == instance) { instance = new MQClientInstance(clientConfig.cloneClientConfig(), - this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); + this.factoryIndexGenerator.getAndIncrement(), clientId, null); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); if (prev != null) { instance = prev; diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index 843d4da9526a289af735c41840dcb50da2def4ad..769164bf14087d5ee1f833ae75406aa91584a994 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -79,6 +79,8 @@ import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public class MQClientInstance { @@ -121,14 +123,15 @@ public class MQClientInstance { this(clientConfig, instanceIndex, clientId, null); } - public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { + public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId, + InterceptorGroup interceptorGroup) { this.clientConfig = clientConfig; this.instanceIndex = instanceIndex; this.nettyClientConfig = new ClientConfig(); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.clientRemotingProcessor = new ClientRemotingProcessor(this); - this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); + this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, interceptorGroup, clientConfig); if (this.clientConfig.getNamesrvAddr() != null) { this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); diff --git a/common/pom.xml b/common/pom.xml index 62c6d7eb5c2b2580eeb53f10d31ea9c3579ba78f..5b24f8176ab47a89b9f763614ffb1f9df7037c3d 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -37,5 +37,9 @@ ${project.groupId} rocketmq-remoting + + org.yaml + snakeyaml + diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java index a846755d8db47f198e02d05c1da5b573474ed4e9..3b1866e4d49447b1644d2f6cc19eae9bbe6c02be 100644 --- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java +++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java @@ -16,9 +16,12 @@ */ package org.apache.rocketmq.common; +import com.alibaba.fastjson.parser.ParserConfig; +import com.alibaba.fastjson.util.TypeUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -28,19 +31,21 @@ import java.net.NetworkInterface; import java.text.NumberFormat; import java.text.ParseException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.zip.CRC32; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; - import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.yaml.snakeyaml.Yaml; public class UtilAll { private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); @@ -517,4 +522,23 @@ public class UtilAll { file.delete(); } } + + public static T getYamlDataObject(String path, Class clazz) { + Yaml ymal = new Yaml(); + FileInputStream fis = null; + try { + fis = new FileInputStream(new File(path)); + return ymal.loadAs(fis, clazz); + } catch (Exception e) { + throw new RuntimeException(String.format("The file for Plain mode was not found , paths %s", path), e); + } finally { + if (fis != null) { + try { + fis.close(); + } catch (IOException e) { + throw new RuntimeException("close transport fileInputStream Exception", e); + } + } + } + } } diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java new file mode 100644 index 0000000000000000000000000000000000000000..fc4ef40b3aca9fa695f5d2c6b78e04a785ae122a --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/AbstractFlowControlService.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.flowcontrol; + +import com.alibaba.csp.sentinel.SphO; +import com.alibaba.csp.sentinel.slots.block.RuleConstant; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRule; +import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.interceptor.ResponseContext; + +public abstract class AbstractFlowControlService implements Interceptor { + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + private final ThreadLocal acquiredThreadLocal = new ThreadLocal(); + private final FlowControlConfig flowControlConfig; + + public AbstractFlowControlService() { + this.flowControlConfig = new FlowControlConfig(); + loadRules(this.flowControlConfig); + } + + public abstract String getResourceKey(RequestContext requestContext); + + public abstract int getResourceCount(RequestContext requestContext); + + public abstract String getFlowControlType(); + + public abstract void rejectRequest(RequestContext requestContext); + + @Override + public void beforeRequest(RequestContext requestContext) { + String resourceKey = getResourceKey(requestContext); + int resourceCount = getResourceCount(requestContext); + resourceCount = resourceCount == 0 ? 1 : resourceCount; + if (resourceKey != null) { + boolean acquired = SphO.entry(resourceKey, resourceCount); + if (acquired) { + this.acquiredThreadLocal.set(true); + } else { + rejectRequest(requestContext); + } + } + } + + @Override + public void afterRequest(ResponseContext responseContext) { + Boolean acquired = this.acquiredThreadLocal.get(); + if (acquired != null && acquired == true) { + SphO.exit(); + } + } + + @Override + public void onException(ExceptionContext exceptionContext) { + Boolean acquired = this.acquiredThreadLocal.get(); + if (acquired != null && acquired == true) { + SphO.exit(); + } + } + + public List getRules(String moduleName, String flowControlType) { + if (this.flowControlConfig != null) { + Map>> rules = this.flowControlConfig.getPlainFlowControlRules(); + Map> flowControlMap = rules.get(moduleName); + if (flowControlMap != null) { + if (flowControlMap.get(flowControlType) != null) { + return flowControlMap.get(flowControlType); + } else { + log.warn("Get flow control config null by flowControlType: {} ", flowControlType); + } + } else { + log.warn("Get flow control config null by moduleName: {} ", moduleName); + } + } else { + log.warn("flowControlConfig is null "); + } + return null; + } + + private void loadRules(FlowControlConfig flowControlConfig) { + Map>> rules = flowControlConfig.getPlainFlowControlRules(); + for (Map> flowControlTypeMap : rules.values()) { + for (List list : flowControlTypeMap.values()) { + for (FlowControlRule flowControlRule : list) { + List sentinelRules = new ArrayList(); + FlowRule rule1 = new FlowRule(); + rule1.setResource(flowControlRule.getFlowControlResourceName()); + rule1.setCount(flowControlRule.getFlowControlResourceCount()); + rule1.setGrade(flowControlRule.getFlowControlGrade()); + rule1.setLimitApp("default"); + sentinelRules.add(rule1); + FlowRuleManager.loadRules(sentinelRules); + } + } + } + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..58bbade999e74405ba226fa0a1859e005b599ddd --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlConfig.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.flowcontrol; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.rocketmq.common.MixAll; +import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.constant.LoggerName; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; + +public class FlowControlConfig { + + private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); + + private String flowControlFileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV)); + private static final String DEFAULT_FLOW_CONTROL_FILE = "conf/flowControl.yml"; + + private String flowControlFileName = System.getProperty("rocketmq.flow.control.file", DEFAULT_FLOW_CONTROL_FILE); + + private List rules; + + public static final String defaultResourceName = "overallFlowControl"; + + private Map>> plainFlowControlRules; + + public FlowControlConfig() { + loadFlowControlConfig(); + } + + public void loadFlowControlConfig() { + JSONObject jsonObject = UtilAll.getYamlDataObject(flowControlFileHome + File.separator + flowControlFileName, + JSONObject.class); + if (jsonObject != null && jsonObject.size() > 0) { + plainFlowControlRules = new HashMap>>(); + Set> entries = jsonObject.entrySet(); + for (Map.Entry entry : entries) { + String serverName = entry.getKey(); + Map> flowControlTypeMap = plainFlowControlRules.get(serverName); + if (flowControlTypeMap == null) { + flowControlTypeMap = new HashMap>(); + plainFlowControlRules.put(serverName, flowControlTypeMap); + } + + LinkedHashMap flowControlMap = (LinkedHashMap) entry.getValue(); + Iterator iter = flowControlMap.entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry tmp = (Map.Entry) iter.next(); + String flowControlType = (String) tmp.getKey(); + List jsonList = (ArrayList) tmp.getValue(); + if (jsonList != null) { + for (Object json : jsonList) { + Map map = (LinkedHashMap) json; + FlowControlRule flowControlRule = JSON.parseObject(JSON.toJSONString(map), FlowControlRule.class); + List flowControlRules = flowControlTypeMap.get(flowControlType); + if (flowControlRules == null) { + flowControlRules = new ArrayList(); + flowControlTypeMap.put(flowControlType, flowControlRules); + } + flowControlRules.add(flowControlRule); + } + } + } + } + } + log.warn("Load topic config: {}", this.plainFlowControlRules); + } + + public Map>> getPlainFlowControlRules() { + return plainFlowControlRules; + } + +} diff --git a/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java new file mode 100644 index 0000000000000000000000000000000000000000..8735ff96546519740a44650fb26eb07ab52f41a9 --- /dev/null +++ b/common/src/main/java/org/apache/rocketmq/common/flowcontrol/FlowControlRule.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.common.flowcontrol; + +import org.apache.rocketmq.remoting.interceptor.Interceptor; + +public class FlowControlRule { + private String flowControlResourceName; + private Integer flowControlGrade; + private Integer flowControlBehavior; + private double flowControlResourceCount; + + public String getFlowControlResourceName() { + return flowControlResourceName; + } + + public void setFlowControlResourceName(String flowControlResourceName) { + this.flowControlResourceName = flowControlResourceName; + } + + public Integer getFlowControlGrade() { + return flowControlGrade; + } + + public void setFlowControlGrade(Integer flowControlGrade) { + this.flowControlGrade = flowControlGrade; + } + + public Integer getFlowControlBehavior() { + return flowControlBehavior; + } + + public void setFlowControlBehavior(Integer flowControlBehavior) { + this.flowControlBehavior = flowControlBehavior; + } + + public double getFlowControlResourceCount() { + return flowControlResourceCount; + } + + public void setFlowControlResourceCount(double flowControlResourceCount) { + this.flowControlResourceCount = flowControlResourceCount; + } + + @Override + public String toString() { + return "FlowControlRule{" + + "flowControlResourceName='" + flowControlResourceName + '\'' + + ", flowControlGrade=" + flowControlGrade + + ", flowControlBehavior=" + flowControlBehavior + + ", flowControlResourceCount=" + flowControlResourceCount + + '}'; + } +} diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java index 8d667a4b4e1db48ca121e6d07c5732e04cf38d0c..f9798d0ea44f27fda305af6623fe822f2756fec8 100644 --- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java +++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Consumer.java @@ -75,7 +75,6 @@ public class Consumer { * Launch the consumer instance. */ consumer.start(); - System.out.printf("Consumer Started.%n"); } } diff --git a/pom.xml b/pom.xml index 60ebe41faa82ce4c89b859fd580deeac46b0d291..306d769b7eeeb9e69b01fd846d5a12bd29b8fadc 100644 --- a/pom.xml +++ b/pom.xml @@ -598,6 +598,16 @@ log4j-slf4j-impl 2.7 + + com.alibaba.csp + sentinel-core + 1.4.1 + + + org.yaml + snakeyaml + 1.19 + diff --git a/remoting/pom.xml b/remoting/pom.xml index 26e5bfc322e62b737f5c8c6ab79858aba26c66cd..e5f6623c0de8cdd6c1f65d69d13ad7d57d260c64 100644 --- a/remoting/pom.xml +++ b/remoting/pom.xml @@ -28,8 +28,8 @@ rocketmq-remoting ${project.version} - 1.6 - 1.6 + 1.8 + 1.8 @@ -54,5 +54,13 @@ netty-tcnative-boringssl-static 1.1.33.Fork26 + + com.alibaba.csp + sentinel-core + + + com.google.guava + guava + diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java index a04206569bf79ff78d6351486e35eca12247891a..c0b8558dc0f6b12991754af2c8554b7b1984317a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingServer.java @@ -21,6 +21,7 @@ import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.protocol.RemotingCommand; public interface RemotingServer extends RemotingService { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java index 2f887972172df5b70b0e6f35948b19cf1aa1e814..02b39375b5152ab503ec27d3b76d546869992537 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/RemotingService.java @@ -17,10 +17,12 @@ package org.apache.rocketmq.remoting; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; + public interface RemotingService { void start(); void shutdown(); - void registerRPCHook(RPCHook rpcHook); + void registerInterceptorGroup(InterceptorGroup interceptorGroup); } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingRuntimeException.java b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingRuntimeException.java new file mode 100644 index 0000000000000000000000000000000000000000..7fcfb41e3eb6a49b808f48e28777df617169f680 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/exception/RemotingRuntimeException.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.exception; + +public class RemotingRuntimeException extends RuntimeException { + private final int responseCode; + private final String responseMessage; + + public RemotingRuntimeException(int responseCode, String errorMessage) { + this.responseCode = responseCode; + this.responseMessage = errorMessage; + } + + public int getResponseCode() { + return responseCode; + } + + public String getResponseMessage() { + return responseMessage; + } + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java index bfb9d236a509635421fa4cb5f66795dd657788af..1a834ec4d215de9289aff4c6d58bcf99f800543a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/ExceptionContext.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.rocketmq.remoting.interceptor; + import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java index 5baee83c91de6286d1dd5421b9541e02e18efcab..518d36a7ab17494d2409cf02c0099927727a88e5 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/Interceptor.java @@ -15,7 +15,11 @@ * limitations under the License. */ package org.apache.rocketmq.remoting.interceptor; + public interface Interceptor { + + String interceptorName(); + void beforeRequest(RequestContext requestContext); void afterRequest(ResponseContext responseContext); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java index de80021305079697577d9135308a7b6a43dacfbb..8945208af54d838ea425a55ec1e5c831ed2f085f 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorGroup.java @@ -15,14 +15,15 @@ * limitations under the License. */ package org.apache.rocketmq.remoting.interceptor; + import java.util.ArrayList; import java.util.List; public class InterceptorGroup { - private List interceptors = new ArrayList(); + private List interceptors = new ArrayList<>(); - public void registerInterceptor(Interceptor interceptor) { - if (interceptor != null) { + public synchronized void registerInterceptor(Interceptor interceptor) { + if (interceptor != null && !interceptors.contains(interceptor)) { interceptors.add(interceptor); } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorInvoker.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorInvoker.java new file mode 100644 index 0000000000000000000000000000000000000000..5fa4b7ad0d1e41c914a7d7582d6046e008a92d19 --- /dev/null +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/InterceptorInvoker.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.remoting.interceptor; + +import org.apache.rocketmq.remoting.RemotingChannel; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; + +public class InterceptorInvoker { + public static void invokeBeforeRequest(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel, + RemotingCommand request) { + if (interceptorGroup != null) { + RequestContext requestContext = new RequestContext(request, remotingChannel); + interceptorGroup.beforeRequest(requestContext); + } + } + + public static void invokeAfterRequest(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel, + RemotingCommand request, RemotingCommand response) { + if (interceptorGroup != null) { + ResponseContext responseContext = new ResponseContext(request, remotingChannel, response); + interceptorGroup.afterRequest(responseContext); + } + } + + public static void invokeOnException(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel, + RemotingCommand request, Throwable throwable, String remark) { + if (interceptorGroup != null) { + ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, throwable, remark); + interceptorGroup.onException(exceptionContext); + } + } + +} diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java index a0fffbbfa50c2b66a3e51b405ecd7657639cdf58..ecf3166ab27f2df38ec7fae6ade2810b28198695 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/interceptor/RequestContext.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.rocketmq.remoting.interceptor; + import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.protocol.RemotingCommand; @@ -23,8 +24,8 @@ public class RequestContext { protected RemotingChannel remotingChannel; public RequestContext(RemotingCommand request, RemotingChannel remotingChannel) { - this.remotingChannel = remotingChannel; this.request = request; + this.remotingChannel = remotingChannel; } public RemotingCommand getRequest() { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java index 1555959b0d3fc91818778ae4a851bc18a8be6c7f..14e1969cad086a139daf505bbadb27d5306e4e3a 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java @@ -37,20 +37,22 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RequestProcessor; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.common.ServiceThread; +import org.apache.rocketmq.remoting.exception.RemotingRuntimeException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; -import org.apache.rocketmq.logging.InternalLogger; -import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; +import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.remoting.util.ThreadUtils; @@ -194,21 +196,15 @@ public abstract class NettyRemotingAbstract { final Pair matched = this.processorTable.get(cmd.getCode()); final Pair pair = null == matched ? this.defaultRequestProcessor : matched; final int opaque = cmd.getOpaque(); - + final InterceptorGroup interceptorGroup = NettyRemotingAbstract.this.getInterceptorGroup(); if (pair != null) { Runnable run = new Runnable() { @Override public void run() { try { - RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); - if (rpcHook != null) { - rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd); - } - + InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, cmd); final RemotingCommand response = pair.getObject1().processRequest(remotingChannel, cmd); - if (rpcHook != null) { - rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response); - } + InterceptorInvoker.invokeAfterRequest(interceptorGroup, remotingChannel, cmd, response); if (!cmd.isOnewayRPC()) { if (response != null) { @@ -221,17 +217,22 @@ public abstract class NettyRemotingAbstract { log.error(cmd.toString()); log.error(response.toString()); } - } else { - } } - } catch (Throwable e) { - log.error("process request exception", e); + } catch (Throwable throwable) { + log.error("Process request exception", throwable); log.error(cmd.toString()); - + InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, cmd, throwable, null); + int responseCode = RemotingSysResponseCode.SYSTEM_ERROR; + String responseMessage = RemotingHelper.exceptionSimpleDesc(throwable); if (!cmd.isOnewayRPC()) { - final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, - RemotingHelper.exceptionSimpleDesc(e)); + if (throwable instanceof RemotingRuntimeException) { + RemotingRuntimeException remotingRuntimeException = (RemotingRuntimeException) throwable; + responseCode = remotingRuntimeException.getResponseCode(); + responseMessage = remotingRuntimeException.getResponseMessage(); + } + final RemotingCommand response = RemotingCommand.createResponseCommand(responseCode, + responseMessage); response.setOpaque(opaque); ctx.writeAndFlush(response); } @@ -340,11 +341,11 @@ public abstract class NettyRemotingAbstract { } /** - * Custom RPC hook. + * Custom interceptor hook. * * @return RPC hook if specified; null otherwise. */ - public abstract RPCHook getRPCHook(); + public abstract InterceptorGroup getInterceptorGroup(); /** * This method specifies thread pool to use while invoking callback methods. @@ -407,6 +408,26 @@ public abstract class NettyRemotingAbstract { } } + public RemotingCommand invokeSyncWithInterceptor(final RemotingChannel remotingChannel, + final RemotingCommand request, + final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { + InterceptorGroup interceptorGroup = getInterceptorGroup(); + InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request); + Channel channel = null; + if (remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } + try { + RemotingCommand response = invokeSyncImpl(channel, request, timeoutMillis); + InterceptorInvoker.invokeAfterRequest(interceptorGroup, remotingChannel, request, response); + return response; + } catch (InterruptedException | RemotingSendRequestException | RemotingTimeoutException ex) { + InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null); + log.error("Sync invoke error ", ex); + throw ex; + } + } + public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { @@ -429,11 +450,12 @@ public abstract class NettyRemotingAbstract { responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); - log.warn("send a request command to channel <" + addr + "> failed."); + log.warn("Send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); + if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, @@ -449,7 +471,26 @@ public abstract class NettyRemotingAbstract { } } - abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException; + abstract protected RemotingChannel getAndCreateChannel(final String addr, long timeout) throws InterruptedException; + + public void invokeAsyncWithInterceptor( + final RemotingChannel remotingChannel, + final RemotingCommand request, + final long timeoutMillis, + final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + InterceptorGroup interceptorGroup = this.getInterceptorGroup(); + InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request); + Channel channel = null; + if (remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } + try { + invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); + } catch (InterruptedException | RemotingTooMuchRequestException | RemotingTimeoutException | RemotingSendRequestException ex) { + InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null); + throw ex; + } + } public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis, @@ -488,7 +529,10 @@ public abstract class NettyRemotingAbstract { final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel); try { if (channel == null) { - channel = getAndCreateChannel(addr, timeoutMillis); + RemotingChannel remotingChannel = getAndCreateChannel(addr, timeoutMillis); + if (remotingChannel != null && remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } responseFuture.setProcessChannel(channel); } channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @@ -558,6 +602,26 @@ public abstract class NettyRemotingAbstract { } } + public void invokeOnewayWithInterceptor(final RemotingChannel remotingChannel, final RemotingCommand request, + final long timeoutMillis) + throws + InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { + Channel channel = null; + + InterceptorGroup interceptorGroup = this.getInterceptorGroup(); + InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request); + + if (remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } + try { + invokeOnewayImpl(channel, request, timeoutMillis); + } catch (InterruptedException | RemotingTooMuchRequestException | RemotingTimeoutException | RemotingSendRequestException ex) { + InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null); + throw ex; + } + } + public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { @@ -650,6 +714,11 @@ public abstract class NettyRemotingAbstract { } } + public void registerNettyProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { + Pair pair = new Pair<>(processor, executor); + this.processorTable.put(requestCode, pair); + } + public class NettyServerHandler extends SimpleChannelInboundHandler { @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java index b61fc630a1228d8a978714613c4c969b4dbaf778..306253b4cc0133182caa197f83fe6b80b6310b46 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingClientAbstract.java @@ -38,8 +38,11 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingUtil; +import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyEvent; import org.apache.rocketmq.remoting.netty.NettyEventType; import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; @@ -72,11 +75,21 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract return Math.abs(r.nextInt() % 999) % 999; } + public void closeRemotingChannel(final String addr, final RemotingChannel remotingChannel) { + Channel channel = null; + if (remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } + if (remotingChannel instanceof NettyChannelHandlerContextImpl) { + channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel(); + } + closeChannel(addr, channel); + } + public void closeChannel(final String addr, final Channel channel) { if (null == channel) { return; } - final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr; try { @@ -122,7 +135,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract this.channelTables.clear(); } - public void closeChannel(final Channel channel) { + private void closeChannel(final Channel channel) { if (null == channel) { return; } @@ -169,17 +182,23 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract } @Override - protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException { + protected RemotingChannel getAndCreateChannel(final String addr, long timeout) throws InterruptedException { + Channel channel = null; if (null == addr) { - return getAndCreateNameServerChannel(timeout); + channel = getAndCreateNameServerChannel(timeout); + } else { + ChannelWrapper cw = this.channelTables.get(addr); + if (cw != null && cw.isOK()) { + channel = cw.getChannel(); + } else { + channel = this.createChannel(addr, timeout); + } } - - ChannelWrapper cw = this.channelTables.get(addr); - if (cw != null && cw.isOK()) { - return cw.getChannel(); + if (channel != null) { + RemotingChannel remotingChannel = new NettyChannelImpl(channel); + return remotingChannel; } - - return this.createChannel(addr, timeout); + return null; } private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException { diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java index 5ef97839802c00ce34c3540f55b18ff63ef0c994..033da393b41ba7925f5c7402d075835d08811732 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/NettyRemotingServerAbstract.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.remoting.transport; -import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; @@ -108,7 +107,4 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract } } - @Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException { - return null; - } } diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java index fb9e3f8beec50bc7d043ed56d2243809cb1747c2..4aa5d68961fa7b4d6501189e06a8f3461c301925 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java @@ -43,6 +43,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; @@ -51,6 +52,10 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.ClientConfig; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; +import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; import org.apache.rocketmq.remoting.RequestProcessor; @@ -67,7 +72,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo private ExecutorService callbackExecutor; private ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; + private InterceptorGroup interceptorGroup; public Http2ClientImpl(final ClientConfig clientConfig, final ChannelEventListener channelEventListener) { @@ -75,7 +80,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo init(clientConfig, channelEventListener); } - public Http2ClientImpl(){ + public Http2ClientImpl() { super(); } @@ -139,12 +144,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo public void shutdown() { super.shutdown(); try { + clearChannels(); - for (ChannelWrapper cw : this.channelTables.values()) { - this.closeChannel(null, cw.getChannel()); - } - - this.channelTables.clear(); if (this.ioGroup != null) { this.ioGroup.shutdownGracefully(); } @@ -166,8 +167,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo } @Override - public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + public void registerInterceptorGroup(InterceptorGroup interceptorGroup) { + this.interceptorGroup = interceptorGroup; } @Override @@ -178,31 +179,25 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo @Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); - if (channel != null && channel.isActive()) { + final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis); + if (remotingChannel != null && remotingChannel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } - RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis); - if (this.rpcHook != null) { - this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); - } + RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis); return response; } catch (RemotingSendRequestException e) { - log.warn("invokeSync: send request exception, so close the channel[{}]", addr); - this.closeChannel(addr, channel); + log.warn("InvokeSync: send request exception, so close the channel[{}]", addr); + this.closeRemotingChannel(addr, remotingChannel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { - this.closeChannel(addr, channel); - log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); + this.closeRemotingChannel(addr, remotingChannel); + log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } - log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); + log.warn("InvokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw new RemotingConnectException(addr); } } @@ -211,20 +206,19 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); - if (channel != null && channel.isActive()) { + final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis); + + if (remotingChannel != null && remotingChannel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } - this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback); + + this.invokeAsyncWithInterceptor(remotingChannel, request, timeoutMillis, invokeCallback); } catch (RemotingSendRequestException e) { log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw e; } } else { - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw new RemotingConnectException(addr); } } @@ -232,20 +226,17 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo @Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); - if (channel != null && channel.isActive()) { + final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis); + if (remotingChannel != null && remotingChannel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } - this.invokeOnewayImpl(channel, request, timeoutMillis); + this.invokeOnewayWithInterceptor(remotingChannel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw e; } } else { - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw new RemotingConnectException(addr); } } @@ -257,13 +248,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - ExecutorService executorThis = executor; - if (null == executor) { - executorThis = this.publicExecutor; - } - - Pair pair = new Pair(processor, executorThis); - this.processorTable.put(requestCode, pair); + executor = (executor == null ? this.publicExecutor : executor); + registerNettyProcessor(requestCode, processor, executor); } @Override @@ -277,8 +263,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo } @Override - public RPCHook getRPCHook() { - return this.rpcHook; + public InterceptorGroup getInterceptorGroup() { + return this.interceptorGroup; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java index 3352127d1ce17f7ea6a17ca5a8eb3b56901b38a4..507065d265a243975887e208d28f5d927e70acdc 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java @@ -56,6 +56,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; @@ -79,7 +81,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo private ChannelEventListener channelEventListener; private ExecutorService publicExecutor; private int port; - private RPCHook rpcHook; + private InterceptorGroup interceptorGroup; public Http2ServerImpl(ServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); @@ -137,12 +139,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - ExecutorService executorThis = executor; - if (null == executor) { - executorThis = this.publicExecutor; - } - Pair pair = new Pair(processor, executorThis); - this.processorTable.put(requestCode, pair); + executor = (executor == null ? this.publicExecutor : executor); + registerNettyProcessor(requestCode, processor, executor); } @Override @@ -242,13 +240,18 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo } @Override - public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + public void registerInterceptorGroup(InterceptorGroup interceptorGroup) { + this.interceptorGroup = interceptorGroup; + } + + @Override + public InterceptorGroup getInterceptorGroup() { + return this.interceptorGroup; } @Override - public RPCHook getRPCHook() { - return this.rpcHook; + protected RemotingChannel getAndCreateChannel(String addr, long timeout) throws InterruptedException { + return null; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java index 5e91b7c72f23c3811f31141af07eb9bf1334067c..465814743b2c7a6d409bb4b82e560a4bff2e3a71 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingClient.java @@ -38,15 +38,24 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.RPCHook; +import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.RequestProcessor; +import org.apache.rocketmq.remoting.interceptor.ExceptionContext; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; +import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.interceptor.ResponseContext; +import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.TlsHelper; import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract; @@ -67,7 +76,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements private ExecutorService callbackExecutor; private ChannelEventListener channelEventListener; private DefaultEventExecutorGroup defaultEventExecutorGroup; - private RPCHook rpcHook; + private InterceptorGroup interceptorGroup; public NettyRemotingClient() { super(); @@ -123,7 +132,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .handler(new ChannelInitializer() { @Override - public void initChannel(SocketChannel ch) throws Exception { + public void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); if (nettyClientConfig.isUseTLS()) { if (null != sslContext) { @@ -165,8 +174,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements } @Override - public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + public void registerInterceptorGroup(InterceptorGroup interceptorGroup) { + this.interceptorGroup = interceptorGroup; } @Override @@ -178,38 +187,35 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); - - if (channel != null && channel.isActive()) { + final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis); + if (remotingChannel != null && remotingChannel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } - RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); - if (this.rpcHook != null) { - this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); - } + RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis - costTime); return response; - } catch (RemotingSendRequestException e) { - log.warn("invokeSync: send request exception, so close the channel[{}]", addr); - this.closeChannel(addr, channel); - throw e; - } catch (RemotingTimeoutException e) { - if (nettyClientConfig.isClientCloseSocketIfTimeout()) { - this.closeChannel(addr, channel); - log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); + } catch (RemotingException remotingException) { + if (remotingException instanceof RemotingSendRequestException) { + log.warn("invokeSync: send request exception, so close the channel[{}]", addr); + this.closeRemotingChannel(addr, remotingChannel); + throw (RemotingSendRequestException) remotingException; + } + if (remotingException instanceof RemotingTimeoutException) { + if (nettyClientConfig.isClientCloseSocketIfTimeout()) { + this.closeRemotingChannel(addr, remotingChannel); + log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); + } + log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); + throw (RemotingTimeoutException) remotingException; } - log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); - throw e; } } else { - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw new RemotingConnectException(addr); } + return null; } @Override @@ -217,12 +223,14 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); + final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis); + Channel channel = null; + if (remotingChannel instanceof NettyChannelImpl) { + channel = ((NettyChannelImpl) remotingChannel).getChannel(); + } + if (channel != null && channel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTooMuchRequestException("invokeAsync call timeout"); @@ -242,20 +250,17 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements @Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); - if (channel != null && channel.isActive()) { + final RemotingChannel remotingChannel = getAndCreateChannel(addr, timeoutMillis); + if (remotingChannel != null && remotingChannel.isActive()) { try { - if (this.rpcHook != null) { - this.rpcHook.doBeforeRequest(addr, request); - } - this.invokeOnewayImpl(channel, request, timeoutMillis); + this.invokeOnewayWithInterceptor(remotingChannel, request, timeoutMillis); } catch (RemotingSendRequestException e) { - log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); - this.closeChannel(addr, channel); + log.warn("InvokeOneway: send request exception, so close the channel[{}]", addr); + this.closeRemotingChannel(addr, remotingChannel); throw e; } } else { - this.closeChannel(addr, channel); + this.closeRemotingChannel(addr, remotingChannel); throw new RemotingConnectException(addr); } } @@ -267,7 +272,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements executorThis = this.publicExecutor; } - Pair pair = new Pair(processor, executorThis); + Pair pair = new Pair<>(processor, executorThis); this.processorTable.put(requestCode, pair); } @@ -282,8 +287,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements } @Override - public RPCHook getRPCHook() { - return this.rpcHook; + public InterceptorGroup getInterceptorGroup() { + return this.interceptorGroup; } @Override diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java index f7ff84233ee21dd8f9bf721eeee4ceb475e961e6..ec02a28d3556f7ef45f2ce601e28ee6b24ddbef2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/transport/rocketmq/NettyRemotingServer.java @@ -42,7 +42,6 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.InvokeCallback; -import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RequestProcessor; @@ -53,6 +52,8 @@ import org.apache.rocketmq.remoting.common.TlsMode; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.remoting.netty.FileRegionEncoder; import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.TlsHelper; @@ -74,9 +75,8 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements private DefaultEventExecutorGroup defaultEventExecutorGroup; private Class socketChannelClass; - private RPCHook rpcHook; - private int port = 0; + private InterceptorGroup interceptorGroup; private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; private static final String TLS_HANDLER_NAME = "sslHandler"; @@ -216,24 +216,19 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements } @Override - public void registerRPCHook(RPCHook rpcHook) { - this.rpcHook = rpcHook; + public void registerInterceptorGroup(InterceptorGroup interceptorGroup) { + this.interceptorGroup = interceptorGroup; } @Override public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { - ExecutorService executorThis = executor; - if (null == executor) { - executorThis = this.publicExecutor; - } - - Pair pair = new Pair(processor, executorThis); - this.processorTable.put(requestCode, pair); + executor = (executor == null ? this.publicExecutor : executor); + registerNettyProcessor(requestCode, processor, executor); } @Override public void registerDefaultProcessor(RequestProcessor processor, ExecutorService executor) { - this.defaultRequestProcessor = new Pair(processor, executor); + this.defaultRequestProcessor = new Pair<>(processor, executor); } @Override @@ -273,8 +268,13 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements } @Override - public RPCHook getRPCHook() { - return this.rpcHook; + public InterceptorGroup getInterceptorGroup() { + return this.interceptorGroup; + } + + @Override + protected RemotingChannel getAndCreateChannel(String addr, long timeout) throws InterruptedException { + return null; } @Override diff --git a/snode/pom.xml b/snode/pom.xml index 994ba40e1834395c59f6d0deeaada78eba3564be..2a1712206127fe8d99bf56aff2439cb0044f04a4 100644 --- a/snode/pom.xml +++ b/snode/pom.xml @@ -72,6 +72,10 @@ org.slf4j slf4j-api + + org.yaml + snakeyaml + diff --git a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java index 615be52ef0682ab33b9cd0694c73fedefcb8eeb7..cac2898c1f7a5e0cbeadc0029b1e73696ac72b5d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java @@ -34,6 +34,9 @@ import org.apache.rocketmq.remoting.RemotingClientFactory; import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RemotingServerFactory; import org.apache.rocketmq.remoting.ServerConfig; +import org.apache.rocketmq.remoting.interceptor.Interceptor; +import org.apache.rocketmq.remoting.interceptor.InterceptorFactory; +import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.snode.client.ClientHousekeepingService; import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener; import org.apache.rocketmq.snode.client.ConsumerManager; @@ -41,9 +44,6 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener; import org.apache.rocketmq.snode.client.ProducerManager; import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.config.SnodeConfig; -import org.apache.rocketmq.remoting.interceptor.Interceptor; -import org.apache.rocketmq.remoting.interceptor.InterceptorFactory; -import org.apache.rocketmq.remoting.interceptor.InterceptorGroup; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor; @@ -82,7 +82,8 @@ public class SnodeController { private ConsumerManageProcessor consumerManageProcessor; private SendMessageProcessor sendMessageProcessor; private PullMessageProcessor pullMessageProcessor; - private HeartbeatProcessor hearbeatProcessor; + private HeartbeatProcessor heartbeatProcessor; + private InterceptorGroup remotingServerInterceptorGroup; private InterceptorGroup consumeMessageInterceptorGroup; private InterceptorGroup sendMessageInterceptorGroup; private PushService pushService; @@ -160,7 +161,7 @@ public class SnodeController { this.consumerOffsetManager = new ConsumerOffsetManager(this); this.consumerManageProcessor = new ConsumerManageProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this); - this.hearbeatProcessor = new HeartbeatProcessor(this); + this.heartbeatProcessor = new HeartbeatProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this); this.pushService = new PushServiceImpl(this); @@ -170,19 +171,41 @@ public class SnodeController { return snodeConfig; } +// private void initFlowControlIntercepterGruop() { +// this.remotingServerInterceptorGroup = new InterceptorGroup(); +// List remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); +// this.remotingServerInterceptorGroup.registerInterceptor(flowControlService); +// } + + private void initRemotingServerInterceptorGroup() { + List remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath()); + if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) { + if (this.remotingServerInterceptorGroup == null) { + this.remotingServerInterceptorGroup = new InterceptorGroup(); + } + for (Interceptor interceptor : remotingServerInterceptors) { + this.remotingServerInterceptorGroup.registerInterceptor(interceptor); + log.warn("Remoting server interceptor: {} registered!", interceptor.interceptorName()); + } + } + } + public boolean initialize() { this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); this.registerProcessor(); - initInterceptorGroup(); + initSnodeInterceptorGroup(); + initRemotingServerInterceptorGroup(); + this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup); return true; } - private void initInterceptorGroup() { + private void initSnodeInterceptorGroup() { List consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { this.consumeMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : consumeMessageInterceptors) { this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); + log.warn("Consume message interceptor: {} registered!", interceptor.interceptorName()); } } List sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); @@ -190,15 +213,17 @@ public class SnodeController { this.sendMessageInterceptorGroup = new InterceptorGroup(); for (Interceptor interceptor : sendMessageInterceptors) { this.sendMessageInterceptorGroup.registerInterceptor(interceptor); + log.warn("Send message interceptor: {} registered!", interceptor.interceptorName()); } } + } public void registerProcessor() { this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); - this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, hearbeatProcessor, this.heartbeatExecutor); - this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, hearbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor); + this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); @@ -287,4 +312,13 @@ public class SnodeController { public void setEnodeService(EnodeService enodeService) { this.enodeService = enodeService; } + + public InterceptorGroup getRemotingServerInterceptorGroup() { + return remotingServerInterceptorGroup; + } + + public void setRemotingServerInterceptorGroup( + InterceptorGroup remotingServerInterceptorGroup) { + this.remotingServerInterceptorGroup = remotingServerInterceptorGroup; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java index cd5beab204437218731bb870ec43eaabfe4a2506..6ec34a82694469895d1e4426fbcdb8c9ecb2251c 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/config/SnodeConfig.java @@ -71,9 +71,13 @@ public class SnodeConfig { private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor"; + private final String remotingServerInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor"; private int listenPort = 11911; + private double snodeQPSLimit = 10000; + + private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) { @@ -276,4 +280,16 @@ public class SnodeConfig { public String getConsumeMessageInterceptorPath() { return consumeMessageInterceptorPath; } + + public String getRemotingServerInterceptorPath() { + return remotingServerInterceptorPath; + } + + public double getSnodeQPSLimit() { + return snodeQPSLimit; + } + + public void setSnodeQPSLimit(double snodeQPSLimit) { + this.snodeQPSLimit = snodeQPSLimit; + } } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..96a1e962b2b6c58d1ba3a28b24d900995546d67a --- /dev/null +++ b/snode/src/main/java/org/apache/rocketmq/snode/flowcontrol/QPSFlowControlServiceImpl.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.snode.flowcontrol; + +import java.util.concurrent.atomic.AtomicLong; +import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService; +import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.logging.InternalLogger; +import org.apache.rocketmq.logging.InternalLoggerFactory; +import org.apache.rocketmq.remoting.common.RemotingHelper; +import org.apache.rocketmq.remoting.exception.RemotingRuntimeException; +import org.apache.rocketmq.remoting.interceptor.RequestContext; +import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; + +public class QPSFlowControlServiceImpl extends AbstractFlowControlService { + private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); + + private final AtomicLong logCount = new AtomicLong(0); + + private final String flowControlType = "countLimit"; + + public QPSFlowControlServiceImpl() { + super(); + } + + @Override + public String getResourceKey(RequestContext requestContext) { + if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) { + return null; + } + return requestContext.getRequest().getCode() + ""; + } + + @Override + public String getFlowControlType() { + return this.flowControlType; + } + + @Override + public void rejectRequest(RequestContext requestContext) { + if (logCount.getAndIncrement() % 100 == 0) { + log.warn("[REJECT]exceed system flow control config QPS, start flow control for a while: requestContext: {} ", requestContext); + } + throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config QPS, start flow control for a while"); + } + + @Override + public String interceptorName() { + return "snodeQPSFlowControlInterceptor"; + } + + @Override + public int getResourceCount(RequestContext requestContext) { + return 1; + } +} diff --git a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java index 065c01798cf2a2c2de286751ee9d85cdb3965cf7..e295a15879ec9d9b86454e416044d7f942ce650f 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java @@ -232,7 +232,7 @@ public class ConsumerOffsetManager { for (Entry queueEntry : map.entrySet()) { Integer queueId = queueEntry.getKey(); Long offset = queueEntry.getValue(); - this.snodeController.getEnodeService().persistOffsetToEnode(enodeName, consumerGroup, topic, queueId, offset); + this.snodeController.getEnodeService().persistOffset(enodeName, consumerGroup, topic, queueId, offset); } } else { log.error("Persist offset split keys error:{}", key); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java index 32b2cd3522dc9b02f15c2019cc2fc2960e985914..09cd90bceca4128e14b988a671b42651b791ae02 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.snode.processor; import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.protocol.RequestCode; +import org.apache.rocketmq.common.protocol.ResponseCode; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.logging.InternalLogger; @@ -74,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor { this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext); } remotingChannel.reply(data); - if (isNeedPush) { + if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) { this.snodeController.getPushService().pushMessage(topic, queueId, message, data); } } else { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/FlowControlService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java similarity index 76% rename from snode/src/main/java/org/apache/rocketmq/snode/service/FlowControlService.java rename to snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java index f6186358ae8ac586ba24c7891b06ed917f8183b8..b19a474ff645f3434be8bf7d951c70cf99c59342 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/FlowControlService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java @@ -1,4 +1,4 @@ -/* +package org.apache.rocketmq.snode.service;/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,11 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.rocketmq.snode.service; -import com.google.common.util.concurrent.RateLimiter; -import com.google.common.util.concurrent.SmoothRateLimiter; - -public class FlowControlService { - RateLimiter rateLimiter = SmoothRateLimiter.create() +public interface AdminService { } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java index d6e307f5d1cebf0223ecded22710350a4ec239aa..12e287ea7b5fd668b2a48088f2928784a8bb845b 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java @@ -44,7 +44,7 @@ public interface EnodeService { boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig); - void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset); + void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset); RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic, int queueId) throws InterruptedException, RemotingTimeoutException, diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java index 1b0a406beafdac2eda4632b3946d908685cb697c..22f5fe2c563a69f56efe708665594ca17517130e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/MetricsService.java @@ -1,4 +1,4 @@ -package org.apache.rocketmq.snode.service;/* +/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -14,6 +14,7 @@ package org.apache.rocketmq.snode.service;/* * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.rocketmq.snode.service; public interface MetricsService { diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java index b4da23840ad46263c706a6af9e1caac7c67961ff..1c358b78573a77f8dd5a62009dc18d721e273584 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/NnodeService.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.rocketmq.snode.service; + import java.util.Set; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.protocol.body.ClusterInfo; diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java b/snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java index f332f8114ea41b1a86c488beb1ede7d3096b9015..905e860fc6a71fb5cbd672cab725dc4e34d69ad9 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/ScheduledService.java @@ -15,7 +15,9 @@ * limitations under the License. */ package org.apache.rocketmq.snode.service; + public interface ScheduledService { void startScheduleTask(); + void shutdown(); } diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java index 55866a2b708ed65b9dc861dd0c86bd7441f53961..abbddd71fadac3f1d68040169b854674a4f1bb2e 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java @@ -205,7 +205,7 @@ public class EnodeServiceImpl implements EnodeService { } @Override - public void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset) { + public void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset) { try { String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); diff --git a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java index 286abc88ab6fc7452b12b6bb04ed9314b1af9c8f..e1c2017021ca14376256182260085a32fa4d528d 100644 --- a/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java +++ b/snode/src/main/java/org/apache/rocketmq/snode/service/impl/NnodeServiceImpl.java @@ -15,6 +15,7 @@ * limitations under the License. */ package org.apache.rocketmq.snode.service.impl; + import java.util.ArrayList; import java.util.List; import java.util.Set; diff --git a/snode/src/main/resources/META-INF/service/flowControl.yml b/snode/src/main/resources/META-INF/service/flowControl.yml new file mode 100644 index 0000000000000000000000000000000000000000..f76a906ebd313a92072171f6261f14cffa88a29e --- /dev/null +++ b/snode/src/main/resources/META-INF/service/flowControl.yml @@ -0,0 +1,19 @@ +snode: + countLimit: # flow control type, only requestCount & requestSize support + - flowControlResourceName: 310 + flowControlGrade: directDeny + flowControlBehavior: flowControlBehavior + flowControlResourceCount: 100000.00 #QPS + + - flowControlResourceName: overall + flowControlGrade: directDeny + flowControlBehavior: flowControlBehavior + flowControlResourceCount: 100000.00 #QPS + + sizeLimit: + - flowControlResourceName: 310 + flowControlGrade: directDeny + flowControlBehavior: flowControlBehavior + flowControlResourceCount: 10.00 #MB/S + + topicLimit: \ No newline at end of file diff --git a/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor new file mode 100644 index 0000000000000000000000000000000000000000..5b1dd52534c583677a888e951c803762f6ae2bb2 --- /dev/null +++ b/snode/src/main/resources/META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor @@ -0,0 +1 @@ +org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl \ No newline at end of file