提交 7f754f10 编写于 作者: D duhenglucky

Add flow control logic

上级 149fc42b
...@@ -999,7 +999,7 @@ public class BrokerController { ...@@ -999,7 +999,7 @@ public class BrokerController {
} }
public void registerServerRPCHook(RPCHook rpcHook) { public void registerServerRPCHook(RPCHook rpcHook) {
getRemotingServer().registerRPCHook(rpcHook); // getRemotingServer().registerRPCHook(rpcHook);
} }
public RemotingServer getRemotingServer() { public RemotingServer getRemotingServer() {
......
...@@ -72,7 +72,7 @@ public class BrokerOuterAPI { ...@@ -72,7 +72,7 @@ public class BrokerOuterAPI {
public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) { public BrokerOuterAPI(final ClientConfig nettyClientConfig, RPCHook rpcHook) {
this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null); this.remotingClient = RemotingClientFactory.getInstance().createRemotingClient().init(nettyClientConfig, null);
this.remotingClient.registerRPCHook(rpcHook); // this.remotingClient.registerRPCHook(rpcHook);
} }
public void start() { public void start() {
...@@ -193,6 +193,9 @@ public class BrokerOuterAPI { ...@@ -193,6 +193,9 @@ public class BrokerOuterAPI {
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
assert response != null; assert response != null;
if (response == null){
System.out.println("ssssssssssssss");
}
switch (response.getCode()) { switch (response.getCode()) {
case ResponseCode.SUCCESS: { case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader = RegisterBrokerResponseHeader responseHeader =
...@@ -393,6 +396,6 @@ public class BrokerOuterAPI { ...@@ -393,6 +396,6 @@ public class BrokerOuterAPI {
} }
public void registerRPCHook(RPCHook rpcHook) { public void registerRPCHook(RPCHook rpcHook) {
remotingClient.registerRPCHook(rpcHook); // remotingClient.registerRPCHook(rpcHook);
} }
} }
...@@ -145,6 +145,8 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; ...@@ -145,6 +145,8 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.ResponseFuture; import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.serialize.LanguageCode; import org.apache.rocketmq.remoting.serialize.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -169,13 +171,13 @@ public class MQClientAPIImpl { ...@@ -169,13 +171,13 @@ public class MQClientAPIImpl {
public MQClientAPIImpl(final ClientConfig nettyClientConfig, public MQClientAPIImpl(final ClientConfig nettyClientConfig,
final ClientRemotingProcessor clientRemotingProcessor, final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final org.apache.rocketmq.client.ClientConfig clientConfig) { InterceptorGroup interceptorGroup, final org.apache.rocketmq.client.ClientConfig clientConfig) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName()); topAddressing = new TopAddressing(MixAll.getWSAddr(), clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null); this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor; this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook); this.remotingClient.registerInterceptorGroup(interceptorGroup);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null); this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
...@@ -556,7 +558,7 @@ public class MQClientAPIImpl { ...@@ -556,7 +558,7 @@ public class MQClientAPIImpl {
} }
public PullResult pullMessage( public PullResult pullMessage(
String addr, String addr,
final PullMessageRequestHeader requestHeader, final PullMessageRequestHeader requestHeader,
final long timeoutMillis, final long timeoutMillis,
final CommunicationMode communicationMode, final CommunicationMode communicationMode,
......
...@@ -50,7 +50,7 @@ public class MQClientManager { ...@@ -50,7 +50,7 @@ public class MQClientManager {
if (null == instance) { if (null == instance) {
instance = instance =
new MQClientInstance(clientConfig.cloneClientConfig(), new MQClientInstance(clientConfig.cloneClientConfig(),
this.factoryIndexGenerator.getAndIncrement(), clientId, rpcHook); this.factoryIndexGenerator.getAndIncrement(), clientId, null);
MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance); MQClientInstance prev = this.factoryTable.putIfAbsent(clientId, instance);
if (prev != null) { if (prev != null) {
instance = prev; instance = prev;
......
...@@ -79,6 +79,8 @@ import org.apache.rocketmq.remoting.RPCHook; ...@@ -79,6 +79,8 @@ import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class MQClientInstance { public class MQClientInstance {
...@@ -121,14 +123,15 @@ public class MQClientInstance { ...@@ -121,14 +123,15 @@ public class MQClientInstance {
this(clientConfig, instanceIndex, clientId, null); this(clientConfig, instanceIndex, clientId, null);
} }
public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId, RPCHook rpcHook) { public MQClientInstance(org.apache.rocketmq.client.ClientConfig clientConfig, int instanceIndex, String clientId,
InterceptorGroup interceptorGroup) {
this.clientConfig = clientConfig; this.clientConfig = clientConfig;
this.instanceIndex = instanceIndex; this.instanceIndex = instanceIndex;
this.nettyClientConfig = new ClientConfig(); this.nettyClientConfig = new ClientConfig();
this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads()); this.nettyClientConfig.setClientCallbackExecutorThreads(clientConfig.getClientCallbackExecutorThreads());
this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS()); this.nettyClientConfig.setUseTLS(clientConfig.isUseTLS());
this.clientRemotingProcessor = new ClientRemotingProcessor(this); this.clientRemotingProcessor = new ClientRemotingProcessor(this);
this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, rpcHook, clientConfig); this.mQClientAPIImpl = new MQClientAPIImpl(this.nettyClientConfig, this.clientRemotingProcessor, interceptorGroup, clientConfig);
if (this.clientConfig.getNamesrvAddr() != null) { if (this.clientConfig.getNamesrvAddr() != null) {
this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr()); this.mQClientAPIImpl.updateNameServerAddressList(this.clientConfig.getNamesrvAddr());
......
...@@ -37,5 +37,9 @@ ...@@ -37,5 +37,9 @@
<groupId>${project.groupId}</groupId> <groupId>${project.groupId}</groupId>
<artifactId>rocketmq-remoting</artifactId> <artifactId>rocketmq-remoting</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -16,9 +16,12 @@ ...@@ -16,9 +16,12 @@
*/ */
package org.apache.rocketmq.common; package org.apache.rocketmq.common;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.util.TypeUtils;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean; import java.lang.management.RuntimeMXBean;
...@@ -28,19 +31,21 @@ import java.net.NetworkInterface; ...@@ -28,19 +31,21 @@ import java.net.NetworkInterface;
import java.text.NumberFormat; import java.text.NumberFormat;
import java.text.ParseException; import java.text.ParseException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar; import java.util.Calendar;
import java.util.Date; import java.util.Date;
import java.util.Enumeration; import java.util.Enumeration;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import java.util.zip.DeflaterOutputStream; import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream; import java.util.zip.InflaterInputStream;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.yaml.snakeyaml.Yaml;
public class UtilAll { public class UtilAll {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME); private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
...@@ -517,4 +522,23 @@ public class UtilAll { ...@@ -517,4 +522,23 @@ public class UtilAll {
file.delete(); file.delete();
} }
} }
public static <T> T getYamlDataObject(String path, Class<T> clazz) {
Yaml ymal = new Yaml();
FileInputStream fis = null;
try {
fis = new FileInputStream(new File(path));
return ymal.loadAs(fis, clazz);
} catch (Exception e) {
throw new RuntimeException(String.format("The file for Plain mode was not found , paths %s", path), e);
} finally {
if (fis != null) {
try {
fis.close();
} catch (IOException e) {
throw new RuntimeException("close transport fileInputStream Exception", e);
}
}
}
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.flowcontrol;
import com.alibaba.csp.sentinel.SphO;
import com.alibaba.csp.sentinel.slots.block.RuleConstant;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRule;
import com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
public abstract class AbstractFlowControlService implements Interceptor {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private final ThreadLocal<Boolean> acquiredThreadLocal = new ThreadLocal<Boolean>();
private final FlowControlConfig flowControlConfig;
public AbstractFlowControlService() {
this.flowControlConfig = new FlowControlConfig();
loadRules(this.flowControlConfig);
}
public abstract String getResourceKey(RequestContext requestContext);
public abstract int getResourceCount(RequestContext requestContext);
public abstract String getFlowControlType();
public abstract void rejectRequest(RequestContext requestContext);
@Override
public void beforeRequest(RequestContext requestContext) {
String resourceKey = getResourceKey(requestContext);
int resourceCount = getResourceCount(requestContext);
resourceCount = resourceCount == 0 ? 1 : resourceCount;
if (resourceKey != null) {
boolean acquired = SphO.entry(resourceKey, resourceCount);
if (acquired) {
this.acquiredThreadLocal.set(true);
} else {
rejectRequest(requestContext);
}
}
}
@Override
public void afterRequest(ResponseContext responseContext) {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired == true) {
SphO.exit();
}
}
@Override
public void onException(ExceptionContext exceptionContext) {
Boolean acquired = this.acquiredThreadLocal.get();
if (acquired != null && acquired == true) {
SphO.exit();
}
}
public List<FlowControlRule> getRules(String moduleName, String flowControlType) {
if (this.flowControlConfig != null) {
Map<String, Map<String, List<FlowControlRule>>> rules = this.flowControlConfig.getPlainFlowControlRules();
Map<String, List<FlowControlRule>> flowControlMap = rules.get(moduleName);
if (flowControlMap != null) {
if (flowControlMap.get(flowControlType) != null) {
return flowControlMap.get(flowControlType);
} else {
log.warn("Get flow control config null by flowControlType: {} ", flowControlType);
}
} else {
log.warn("Get flow control config null by moduleName: {} ", moduleName);
}
} else {
log.warn("flowControlConfig is null ");
}
return null;
}
private void loadRules(FlowControlConfig flowControlConfig) {
Map<String, Map<String, List<FlowControlRule>>> rules = flowControlConfig.getPlainFlowControlRules();
for (Map<String, List<FlowControlRule>> flowControlTypeMap : rules.values()) {
for (List<FlowControlRule> list : flowControlTypeMap.values()) {
for (FlowControlRule flowControlRule : list) {
List<FlowRule> sentinelRules = new ArrayList<FlowRule>();
FlowRule rule1 = new FlowRule();
rule1.setResource(flowControlRule.getFlowControlResourceName());
rule1.setCount(flowControlRule.getFlowControlResourceCount());
rule1.setGrade(flowControlRule.getFlowControlGrade());
rule1.setLimitApp("default");
sentinelRules.add(rule1);
FlowRuleManager.loadRules(sentinelRules);
}
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.flowcontrol;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class FlowControlConfig {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private String flowControlFileHome = System.getProperty(MixAll.ROCKETMQ_HOME_PROPERTY, System.getenv(MixAll.ROCKETMQ_HOME_ENV));
private static final String DEFAULT_FLOW_CONTROL_FILE = "conf/flowControl.yml";
private String flowControlFileName = System.getProperty("rocketmq.flow.control.file", DEFAULT_FLOW_CONTROL_FILE);
private List<FlowControlRule> rules;
public static final String defaultResourceName = "overallFlowControl";
private Map<String/*server name*/, Map<String/*flowControlType*/, List<FlowControlRule>>> plainFlowControlRules;
public FlowControlConfig() {
loadFlowControlConfig();
}
public void loadFlowControlConfig() {
JSONObject jsonObject = UtilAll.getYamlDataObject(flowControlFileHome + File.separator + flowControlFileName,
JSONObject.class);
if (jsonObject != null && jsonObject.size() > 0) {
plainFlowControlRules = new HashMap<String/*server name*/, Map<String /*flowControlType*/, List<FlowControlRule>>>();
Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
for (Map.Entry<String, Object> entry : entries) {
String serverName = entry.getKey();
Map<String /*flowControlType*/, List<FlowControlRule>> flowControlTypeMap = plainFlowControlRules.get(serverName);
if (flowControlTypeMap == null) {
flowControlTypeMap = new HashMap<String, List<FlowControlRule>>();
plainFlowControlRules.put(serverName, flowControlTypeMap);
}
LinkedHashMap flowControlMap = (LinkedHashMap) entry.getValue();
Iterator iter = flowControlMap.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry tmp = (Map.Entry) iter.next();
String flowControlType = (String) tmp.getKey();
List jsonList = (ArrayList) tmp.getValue();
if (jsonList != null) {
for (Object json : jsonList) {
Map map = (LinkedHashMap) json;
FlowControlRule flowControlRule = JSON.parseObject(JSON.toJSONString(map), FlowControlRule.class);
List<FlowControlRule> flowControlRules = flowControlTypeMap.get(flowControlType);
if (flowControlRules == null) {
flowControlRules = new ArrayList<FlowControlRule>();
flowControlTypeMap.put(flowControlType, flowControlRules);
}
flowControlRules.add(flowControlRule);
}
}
}
}
}
log.warn("Load topic config: {}", this.plainFlowControlRules);
}
public Map<String, Map<String, List<FlowControlRule>>> getPlainFlowControlRules() {
return plainFlowControlRules;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.common.flowcontrol;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
public class FlowControlRule {
private String flowControlResourceName;
private Integer flowControlGrade;
private Integer flowControlBehavior;
private double flowControlResourceCount;
public String getFlowControlResourceName() {
return flowControlResourceName;
}
public void setFlowControlResourceName(String flowControlResourceName) {
this.flowControlResourceName = flowControlResourceName;
}
public Integer getFlowControlGrade() {
return flowControlGrade;
}
public void setFlowControlGrade(Integer flowControlGrade) {
this.flowControlGrade = flowControlGrade;
}
public Integer getFlowControlBehavior() {
return flowControlBehavior;
}
public void setFlowControlBehavior(Integer flowControlBehavior) {
this.flowControlBehavior = flowControlBehavior;
}
public double getFlowControlResourceCount() {
return flowControlResourceCount;
}
public void setFlowControlResourceCount(double flowControlResourceCount) {
this.flowControlResourceCount = flowControlResourceCount;
}
@Override
public String toString() {
return "FlowControlRule{" +
"flowControlResourceName='" + flowControlResourceName + '\'' +
", flowControlGrade=" + flowControlGrade +
", flowControlBehavior=" + flowControlBehavior +
", flowControlResourceCount=" + flowControlResourceCount +
'}';
}
}
...@@ -75,7 +75,6 @@ public class Consumer { ...@@ -75,7 +75,6 @@ public class Consumer {
* Launch the consumer instance. * Launch the consumer instance.
*/ */
consumer.start(); consumer.start();
System.out.printf("Consumer Started.%n"); System.out.printf("Consumer Started.%n");
} }
} }
...@@ -598,6 +598,16 @@ ...@@ -598,6 +598,16 @@
<artifactId>log4j-slf4j-impl</artifactId> <artifactId>log4j-slf4j-impl</artifactId>
<version>2.7</version> <version>2.7</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.4.1</version>
</dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.19</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
</project> </project>
...@@ -28,8 +28,8 @@ ...@@ -28,8 +28,8 @@
<name>rocketmq-remoting ${project.version}</name> <name>rocketmq-remoting ${project.version}</name>
<properties> <properties>
<maven.compiler.source>1.6</maven.compiler.source> <maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.6</maven.compiler.target> <maven.compiler.target>1.8</maven.compiler.target>
</properties> </properties>
<dependencies> <dependencies>
...@@ -54,5 +54,13 @@ ...@@ -54,5 +54,13 @@
<artifactId>netty-tcnative-boringssl-static</artifactId> <artifactId>netty-tcnative-boringssl-static</artifactId>
<version>1.1.33.Fork26</version> <version>1.1.33.Fork26</version>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -21,6 +21,7 @@ import org.apache.rocketmq.remoting.common.Pair; ...@@ -21,6 +21,7 @@ import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public interface RemotingServer extends RemotingService { public interface RemotingServer extends RemotingService {
......
...@@ -17,10 +17,12 @@ ...@@ -17,10 +17,12 @@
package org.apache.rocketmq.remoting; package org.apache.rocketmq.remoting;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
public interface RemotingService { public interface RemotingService {
void start(); void start();
void shutdown(); void shutdown();
void registerRPCHook(RPCHook rpcHook); void registerInterceptorGroup(InterceptorGroup interceptorGroup);
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.exception;
public class RemotingRuntimeException extends RuntimeException {
private final int responseCode;
private final String responseMessage;
public RemotingRuntimeException(int responseCode, String errorMessage) {
this.responseCode = responseCode;
this.responseMessage = errorMessage;
}
public int getResponseCode() {
return responseCode;
}
public String getResponseMessage() {
return responseMessage;
}
}
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.remoting.interceptor; package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
......
...@@ -15,7 +15,11 @@ ...@@ -15,7 +15,11 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.remoting.interceptor; package org.apache.rocketmq.remoting.interceptor;
public interface Interceptor { public interface Interceptor {
String interceptorName();
void beforeRequest(RequestContext requestContext); void beforeRequest(RequestContext requestContext);
void afterRequest(ResponseContext responseContext); void afterRequest(ResponseContext responseContext);
......
...@@ -15,14 +15,15 @@ ...@@ -15,14 +15,15 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.remoting.interceptor; package org.apache.rocketmq.remoting.interceptor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
public class InterceptorGroup { public class InterceptorGroup {
private List<Interceptor> interceptors = new ArrayList<Interceptor>(); private List<Interceptor> interceptors = new ArrayList<>();
public void registerInterceptor(Interceptor interceptor) { public synchronized void registerInterceptor(Interceptor interceptor) {
if (interceptor != null) { if (interceptor != null && !interceptors.contains(interceptor)) {
interceptors.add(interceptor); interceptors.add(interceptor);
} }
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
public class InterceptorInvoker {
public static void invokeBeforeRequest(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel,
RemotingCommand request) {
if (interceptorGroup != null) {
RequestContext requestContext = new RequestContext(request, remotingChannel);
interceptorGroup.beforeRequest(requestContext);
}
}
public static void invokeAfterRequest(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel,
RemotingCommand request, RemotingCommand response) {
if (interceptorGroup != null) {
ResponseContext responseContext = new ResponseContext(request, remotingChannel, response);
interceptorGroup.afterRequest(responseContext);
}
}
public static void invokeOnException(InterceptorGroup interceptorGroup, RemotingChannel remotingChannel,
RemotingCommand request, Throwable throwable, String remark) {
if (interceptorGroup != null) {
ExceptionContext exceptionContext = new ExceptionContext(request, remotingChannel, throwable, remark);
interceptorGroup.onException(exceptionContext);
}
}
}
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.remoting.interceptor; package org.apache.rocketmq.remoting.interceptor;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
...@@ -23,8 +24,8 @@ public class RequestContext { ...@@ -23,8 +24,8 @@ public class RequestContext {
protected RemotingChannel remotingChannel; protected RemotingChannel remotingChannel;
public RequestContext(RemotingCommand request, RemotingChannel remotingChannel) { public RequestContext(RemotingCommand request, RemotingChannel remotingChannel) {
this.remotingChannel = remotingChannel;
this.request = request; this.request = request;
this.remotingChannel = remotingChannel;
} }
public RemotingCommand getRequest() { public RemotingCommand getRequest() {
......
...@@ -37,20 +37,22 @@ import java.util.concurrent.RejectedExecutionException; ...@@ -37,20 +37,22 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce; import org.apache.rocketmq.remoting.common.SemaphoreReleaseOnlyOnce;
import org.apache.rocketmq.remoting.common.ServiceThread; import org.apache.rocketmq.remoting.common.ServiceThread;
import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode; import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.util.ThreadUtils; import org.apache.rocketmq.remoting.util.ThreadUtils;
...@@ -194,21 +196,15 @@ public abstract class NettyRemotingAbstract { ...@@ -194,21 +196,15 @@ public abstract class NettyRemotingAbstract {
final Pair<RequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode()); final Pair<RequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<RequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched; final Pair<RequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque(); final int opaque = cmd.getOpaque();
final InterceptorGroup interceptorGroup = NettyRemotingAbstract.this.getInterceptorGroup();
if (pair != null) { if (pair != null) {
Runnable run = new Runnable() { Runnable run = new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook(); InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, cmd);
if (rpcHook != null) {
rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
}
final RemotingCommand response = pair.getObject1().processRequest(remotingChannel, cmd); final RemotingCommand response = pair.getObject1().processRequest(remotingChannel, cmd);
if (rpcHook != null) { InterceptorInvoker.invokeAfterRequest(interceptorGroup, remotingChannel, cmd, response);
rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);
}
if (!cmd.isOnewayRPC()) { if (!cmd.isOnewayRPC()) {
if (response != null) { if (response != null) {
...@@ -221,17 +217,22 @@ public abstract class NettyRemotingAbstract { ...@@ -221,17 +217,22 @@ public abstract class NettyRemotingAbstract {
log.error(cmd.toString()); log.error(cmd.toString());
log.error(response.toString()); log.error(response.toString());
} }
} else {
} }
} }
} catch (Throwable e) { } catch (Throwable throwable) {
log.error("process request exception", e); log.error("Process request exception", throwable);
log.error(cmd.toString()); log.error(cmd.toString());
InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, cmd, throwable, null);
int responseCode = RemotingSysResponseCode.SYSTEM_ERROR;
String responseMessage = RemotingHelper.exceptionSimpleDesc(throwable);
if (!cmd.isOnewayRPC()) { if (!cmd.isOnewayRPC()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR, if (throwable instanceof RemotingRuntimeException) {
RemotingHelper.exceptionSimpleDesc(e)); RemotingRuntimeException remotingRuntimeException = (RemotingRuntimeException) throwable;
responseCode = remotingRuntimeException.getResponseCode();
responseMessage = remotingRuntimeException.getResponseMessage();
}
final RemotingCommand response = RemotingCommand.createResponseCommand(responseCode,
responseMessage);
response.setOpaque(opaque); response.setOpaque(opaque);
ctx.writeAndFlush(response); ctx.writeAndFlush(response);
} }
...@@ -340,11 +341,11 @@ public abstract class NettyRemotingAbstract { ...@@ -340,11 +341,11 @@ public abstract class NettyRemotingAbstract {
} }
/** /**
* Custom RPC hook. * Custom interceptor hook.
* *
* @return RPC hook if specified; null otherwise. * @return RPC hook if specified; null otherwise.
*/ */
public abstract RPCHook getRPCHook(); public abstract InterceptorGroup getInterceptorGroup();
/** /**
* This method specifies thread pool to use while invoking callback methods. * This method specifies thread pool to use while invoking callback methods.
...@@ -407,6 +408,26 @@ public abstract class NettyRemotingAbstract { ...@@ -407,6 +408,26 @@ public abstract class NettyRemotingAbstract {
} }
} }
public RemotingCommand invokeSyncWithInterceptor(final RemotingChannel remotingChannel,
final RemotingCommand request,
final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
InterceptorGroup interceptorGroup = getInterceptorGroup();
InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request);
Channel channel = null;
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
try {
RemotingCommand response = invokeSyncImpl(channel, request, timeoutMillis);
InterceptorInvoker.invokeAfterRequest(interceptorGroup, remotingChannel, request, response);
return response;
} catch (InterruptedException | RemotingSendRequestException | RemotingTimeoutException ex) {
InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null);
log.error("Sync invoke error ", ex);
throw ex;
}
}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis) final long timeoutMillis)
throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {
...@@ -429,11 +450,12 @@ public abstract class NettyRemotingAbstract { ...@@ -429,11 +450,12 @@ public abstract class NettyRemotingAbstract {
responseTable.remove(opaque); responseTable.remove(opaque);
responseFuture.setCause(f.cause()); responseFuture.setCause(f.cause());
responseFuture.putResponse(null); responseFuture.putResponse(null);
log.warn("send a request command to channel <" + addr + "> failed."); log.warn("Send a request command to channel <" + addr + "> failed.");
} }
}); });
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
if (null == responseCommand) { if (null == responseCommand) {
if (responseFuture.isSendRequestOK()) { if (responseFuture.isSendRequestOK()) {
throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,
...@@ -449,7 +471,26 @@ public abstract class NettyRemotingAbstract { ...@@ -449,7 +471,26 @@ public abstract class NettyRemotingAbstract {
} }
} }
abstract protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException; abstract protected RemotingChannel getAndCreateChannel(final String addr, long timeout) throws InterruptedException;
public void invokeAsyncWithInterceptor(
final RemotingChannel remotingChannel,
final RemotingCommand request,
final long timeoutMillis,
final InvokeCallback invokeCallback) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
InterceptorGroup interceptorGroup = this.getInterceptorGroup();
InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request);
Channel channel = null;
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
try {
invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (InterruptedException | RemotingTooMuchRequestException | RemotingTimeoutException | RemotingSendRequestException ex) {
InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null);
throw ex;
}
}
public void invokeAsyncImpl(final Channel channel, final RemotingCommand request, public void invokeAsyncImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis, final long timeoutMillis,
...@@ -488,7 +529,10 @@ public abstract class NettyRemotingAbstract { ...@@ -488,7 +529,10 @@ public abstract class NettyRemotingAbstract {
final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel); final String remotingAddr = RemotingHelper.parseChannelRemoteAddr(channel);
try { try {
if (channel == null) { if (channel == null) {
channel = getAndCreateChannel(addr, timeoutMillis); RemotingChannel remotingChannel = getAndCreateChannel(addr, timeoutMillis);
if (remotingChannel != null && remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
responseFuture.setProcessChannel(channel); responseFuture.setProcessChannel(channel);
} }
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...@@ -558,6 +602,26 @@ public abstract class NettyRemotingAbstract { ...@@ -558,6 +602,26 @@ public abstract class NettyRemotingAbstract {
} }
} }
public void invokeOnewayWithInterceptor(final RemotingChannel remotingChannel, final RemotingCommand request,
final long timeoutMillis)
throws
InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
Channel channel = null;
InterceptorGroup interceptorGroup = this.getInterceptorGroup();
InterceptorInvoker.invokeBeforeRequest(interceptorGroup, remotingChannel, request);
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
try {
invokeOnewayImpl(channel, request, timeoutMillis);
} catch (InterruptedException | RemotingTooMuchRequestException | RemotingTimeoutException | RemotingSendRequestException ex) {
InterceptorInvoker.invokeOnException(interceptorGroup, remotingChannel, request, ex, null);
throw ex;
}
}
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis)
throws throws
InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
...@@ -650,6 +714,11 @@ public abstract class NettyRemotingAbstract { ...@@ -650,6 +714,11 @@ public abstract class NettyRemotingAbstract {
} }
} }
public void registerNettyProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
Pair<RequestProcessor, ExecutorService> pair = new Pair<>(processor, executor);
this.processorTable.put(requestCode, pair);
}
public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> { public class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override @Override
......
...@@ -38,8 +38,11 @@ import java.util.concurrent.locks.Lock; ...@@ -38,8 +38,11 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.common.RemotingUtil; import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.NettyEvent; import org.apache.rocketmq.remoting.netty.NettyEvent;
import org.apache.rocketmq.remoting.netty.NettyEventType; import org.apache.rocketmq.remoting.netty.NettyEventType;
import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract; import org.apache.rocketmq.remoting.netty.NettyRemotingAbstract;
...@@ -72,11 +75,21 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract ...@@ -72,11 +75,21 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
return Math.abs(r.nextInt() % 999) % 999; return Math.abs(r.nextInt() % 999) % 999;
} }
public void closeRemotingChannel(final String addr, final RemotingChannel remotingChannel) {
Channel channel = null;
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
if (remotingChannel instanceof NettyChannelHandlerContextImpl) {
channel = ((NettyChannelHandlerContextImpl) remotingChannel).getChannelHandlerContext().channel();
}
closeChannel(addr, channel);
}
public void closeChannel(final String addr, final Channel channel) { public void closeChannel(final String addr, final Channel channel) {
if (null == channel) { if (null == channel) {
return; return;
} }
final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr; final String addrRemote = null == addr ? RemotingHelper.parseChannelRemoteAddr(channel) : addr;
try { try {
...@@ -122,7 +135,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract ...@@ -122,7 +135,7 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
this.channelTables.clear(); this.channelTables.clear();
} }
public void closeChannel(final Channel channel) { private void closeChannel(final Channel channel) {
if (null == channel) { if (null == channel) {
return; return;
} }
...@@ -169,17 +182,23 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract ...@@ -169,17 +182,23 @@ public abstract class NettyRemotingClientAbstract extends NettyRemotingAbstract
} }
@Override @Override
protected Channel getAndCreateChannel(final String addr, long timeout) throws InterruptedException { protected RemotingChannel getAndCreateChannel(final String addr, long timeout) throws InterruptedException {
Channel channel = null;
if (null == addr) { if (null == addr) {
return getAndCreateNameServerChannel(timeout); channel = getAndCreateNameServerChannel(timeout);
} else {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
channel = cw.getChannel();
} else {
channel = this.createChannel(addr, timeout);
}
} }
if (channel != null) {
ChannelWrapper cw = this.channelTables.get(addr); RemotingChannel remotingChannel = new NettyChannelImpl(channel);
if (cw != null && cw.isOK()) { return remotingChannel;
return cw.getChannel();
} }
return null;
return this.createChannel(addr, timeout);
} }
private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException { private Channel getAndCreateNameServerChannel(long timeout) throws InterruptedException {
......
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
*/ */
package org.apache.rocketmq.remoting.transport; package org.apache.rocketmq.remoting.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleState;
...@@ -108,7 +107,4 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract ...@@ -108,7 +107,4 @@ public abstract class NettyRemotingServerAbstract extends NettyRemotingAbstract
} }
} }
@Override protected Channel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
return null;
}
} }
...@@ -43,6 +43,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -43,6 +43,7 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
...@@ -51,6 +52,10 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; ...@@ -51,6 +52,10 @@ import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder; import org.apache.rocketmq.remoting.transport.rocketmq.NettyEncoder;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
...@@ -67,7 +72,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -67,7 +72,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
private ExecutorService callbackExecutor; private ExecutorService callbackExecutor;
private ChannelEventListener channelEventListener; private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook; private InterceptorGroup interceptorGroup;
public Http2ClientImpl(final ClientConfig clientConfig, public Http2ClientImpl(final ClientConfig clientConfig,
final ChannelEventListener channelEventListener) { final ChannelEventListener channelEventListener) {
...@@ -75,7 +80,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -75,7 +80,7 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
init(clientConfig, channelEventListener); init(clientConfig, channelEventListener);
} }
public Http2ClientImpl(){ public Http2ClientImpl() {
super(); super();
} }
...@@ -139,12 +144,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -139,12 +144,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
public void shutdown() { public void shutdown() {
super.shutdown(); super.shutdown();
try { try {
clearChannels();
for (ChannelWrapper cw : this.channelTables.values()) {
this.closeChannel(null, cw.getChannel());
}
this.channelTables.clear();
if (this.ioGroup != null) { if (this.ioGroup != null) {
this.ioGroup.shutdownGracefully(); this.ioGroup.shutdownGracefully();
} }
...@@ -166,8 +167,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -166,8 +167,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
} }
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.rpcHook = rpcHook; this.interceptorGroup = interceptorGroup;
} }
@Override @Override
...@@ -178,31 +179,25 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -178,31 +179,25 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
@Override @Override
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) { if (remotingChannel != null && remotingChannel.isActive()) {
try { try {
if (this.rpcHook != null) { RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis);
this.rpcHook.doBeforeRequest(addr, request);
}
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response; return response;
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr); log.warn("InvokeSync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw e; throw e;
} catch (RemotingTimeoutException e) { } catch (RemotingTimeoutException e) {
if (nettyClientConfig.isClientCloseSocketIfTimeout()) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); log.warn("InvokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
} }
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); log.warn("InvokeSync: wait response timeout exception, the channel[{}]", addr);
throw e; throw e;
} }
} else { } else {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr); throw new RemotingConnectException(addr);
} }
} }
...@@ -211,20 +206,19 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -211,20 +206,19 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback) public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException { RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) {
if (remotingChannel != null && remotingChannel.isActive()) {
try { try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request); this.invokeAsyncWithInterceptor(remotingChannel, request, timeoutMillis, invokeCallback);
}
this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeAsync: send request exception, so close the channel[{}]", addr); log.warn("invokeAsync: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw e; throw e;
} }
} else { } else {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr); throw new RemotingConnectException(addr);
} }
} }
...@@ -232,20 +226,17 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -232,20 +226,17 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
@Override @Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) { if (remotingChannel != null && remotingChannel.isActive()) {
try { try {
if (this.rpcHook != null) { this.invokeOnewayWithInterceptor(remotingChannel, request, timeoutMillis);
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); log.warn("invokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw e; throw e;
} }
} else { } else {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr); throw new RemotingConnectException(addr);
} }
} }
...@@ -257,13 +248,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -257,13 +248,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor; executor = (executor == null ? this.publicExecutor : executor);
if (null == executor) { registerNettyProcessor(requestCode, processor, executor);
executorThis = this.publicExecutor;
}
Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
} }
@Override @Override
...@@ -277,8 +263,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo ...@@ -277,8 +263,8 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
} }
@Override @Override
public RPCHook getRPCHook() { public InterceptorGroup getInterceptorGroup() {
return this.rpcHook; return this.interceptorGroup;
} }
@Override @Override
......
...@@ -56,6 +56,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper; ...@@ -56,6 +56,8 @@ import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler; import org.apache.rocketmq.remoting.netty.ChannelStatisticsHandler;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder; import org.apache.rocketmq.remoting.transport.rocketmq.NettyDecoder;
...@@ -79,7 +81,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -79,7 +81,7 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
private ChannelEventListener channelEventListener; private ChannelEventListener channelEventListener;
private ExecutorService publicExecutor; private ExecutorService publicExecutor;
private int port; private int port;
private RPCHook rpcHook; private InterceptorGroup interceptorGroup;
public Http2ServerImpl(ServerConfig nettyServerConfig, ChannelEventListener channelEventListener) { public Http2ServerImpl(ServerConfig nettyServerConfig, ChannelEventListener channelEventListener) {
super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
...@@ -137,12 +139,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -137,12 +139,8 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor; executor = (executor == null ? this.publicExecutor : executor);
if (null == executor) { registerNettyProcessor(requestCode, processor, executor);
executorThis = this.publicExecutor;
}
Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
} }
@Override @Override
...@@ -242,13 +240,18 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo ...@@ -242,13 +240,18 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
} }
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.rpcHook = rpcHook; this.interceptorGroup = interceptorGroup;
}
@Override
public InterceptorGroup getInterceptorGroup() {
return this.interceptorGroup;
} }
@Override @Override
public RPCHook getRPCHook() { protected RemotingChannel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
return this.rpcHook; return null;
} }
@Override @Override
......
...@@ -38,15 +38,24 @@ import org.apache.rocketmq.logging.InternalLoggerFactory; ...@@ -38,15 +38,24 @@ import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingClient; import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.common.Pair; import org.apache.rocketmq.remoting.common.Pair;
import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingConnectException; import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.ClientConfig; import org.apache.rocketmq.remoting.ClientConfig;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
import org.apache.rocketmq.remoting.interceptor.ExceptionContext;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.interceptor.InterceptorInvoker;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.interceptor.ResponseContext;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.TlsHelper; import org.apache.rocketmq.remoting.netty.TlsHelper;
import org.apache.rocketmq.remoting.protocol.RemotingCommand; import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract; import org.apache.rocketmq.remoting.transport.NettyRemotingClientAbstract;
...@@ -67,7 +76,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -67,7 +76,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
private ExecutorService callbackExecutor; private ExecutorService callbackExecutor;
private ChannelEventListener channelEventListener; private ChannelEventListener channelEventListener;
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private RPCHook rpcHook; private InterceptorGroup interceptorGroup;
public NettyRemotingClient() { public NettyRemotingClient() {
super(); super();
...@@ -123,7 +132,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -123,7 +132,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize()) .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
.handler(new ChannelInitializer<SocketChannel>() { .handler(new ChannelInitializer<SocketChannel>() {
@Override @Override
public void initChannel(SocketChannel ch) throws Exception { public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) { if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) { if (null != sslContext) {
...@@ -165,8 +174,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -165,8 +174,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
} }
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.rpcHook = rpcHook; this.interceptorGroup = interceptorGroup;
} }
@Override @Override
...@@ -178,38 +187,35 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -178,38 +187,35 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
if (remotingChannel != null && remotingChannel.isActive()) {
if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) { if (timeoutMillis < costTime) {
throw new RemotingTimeoutException("invokeSync call timeout"); throw new RemotingTimeoutException("invokeSync call timeout");
} }
RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); RemotingCommand response = this.invokeSyncWithInterceptor(remotingChannel, request, timeoutMillis - costTime);
if (this.rpcHook != null) {
this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response);
}
return response; return response;
} catch (RemotingSendRequestException e) { } catch (RemotingException remotingException) {
log.warn("invokeSync: send request exception, so close the channel[{}]", addr); if (remotingException instanceof RemotingSendRequestException) {
this.closeChannel(addr, channel); log.warn("invokeSync: send request exception, so close the channel[{}]", addr);
throw e; this.closeRemotingChannel(addr, remotingChannel);
} catch (RemotingTimeoutException e) { throw (RemotingSendRequestException) remotingException;
if (nettyClientConfig.isClientCloseSocketIfTimeout()) { }
this.closeChannel(addr, channel); if (remotingException instanceof RemotingTimeoutException) {
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); if (nettyClientConfig.isClientCloseSocketIfTimeout()) {
this.closeRemotingChannel(addr, remotingChannel);
log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);
}
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw (RemotingTimeoutException) remotingException;
} }
log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);
throw e;
} }
} else { } else {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr); throw new RemotingConnectException(addr);
} }
return null;
} }
@Override @Override
...@@ -217,12 +223,14 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -217,12 +223,14 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException { RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis(); long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = this.getAndCreateChannel(addr, timeoutMillis);
Channel channel = null;
if (remotingChannel instanceof NettyChannelImpl) {
channel = ((NettyChannelImpl) remotingChannel).getChannel();
}
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
try { try {
if (this.rpcHook != null) {
this.rpcHook.doBeforeRequest(addr, request);
}
long costTime = System.currentTimeMillis() - beginStartTime; long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) { if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call timeout"); throw new RemotingTooMuchRequestException("invokeAsync call timeout");
...@@ -242,20 +250,17 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -242,20 +250,17 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
@Override @Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr, timeoutMillis); final RemotingChannel remotingChannel = getAndCreateChannel(addr, timeoutMillis);
if (channel != null && channel.isActive()) { if (remotingChannel != null && remotingChannel.isActive()) {
try { try {
if (this.rpcHook != null) { this.invokeOnewayWithInterceptor(remotingChannel, request, timeoutMillis);
this.rpcHook.doBeforeRequest(addr, request);
}
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) { } catch (RemotingSendRequestException e) {
log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); log.warn("InvokeOneway: send request exception, so close the channel[{}]", addr);
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw e; throw e;
} }
} else { } else {
this.closeChannel(addr, channel); this.closeRemotingChannel(addr, remotingChannel);
throw new RemotingConnectException(addr); throw new RemotingConnectException(addr);
} }
} }
...@@ -267,7 +272,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -267,7 +272,7 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
executorThis = this.publicExecutor; executorThis = this.publicExecutor;
} }
Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executorThis); Pair<RequestProcessor, ExecutorService> pair = new Pair<>(processor, executorThis);
this.processorTable.put(requestCode, pair); this.processorTable.put(requestCode, pair);
} }
...@@ -282,8 +287,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements ...@@ -282,8 +287,8 @@ public class NettyRemotingClient extends NettyRemotingClientAbstract implements
} }
@Override @Override
public RPCHook getRPCHook() { public InterceptorGroup getInterceptorGroup() {
return this.rpcHook; return this.interceptorGroup;
} }
@Override @Override
......
...@@ -42,7 +42,6 @@ import org.apache.rocketmq.logging.InternalLogger; ...@@ -42,7 +42,6 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener; import org.apache.rocketmq.remoting.ChannelEventListener;
import org.apache.rocketmq.remoting.InvokeCallback; import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingChannel; import org.apache.rocketmq.remoting.RemotingChannel;
import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RequestProcessor; import org.apache.rocketmq.remoting.RequestProcessor;
...@@ -53,6 +52,8 @@ import org.apache.rocketmq.remoting.common.TlsMode; ...@@ -53,6 +52,8 @@ import org.apache.rocketmq.remoting.common.TlsMode;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException; import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.remoting.netty.FileRegionEncoder; import org.apache.rocketmq.remoting.netty.FileRegionEncoder;
import org.apache.rocketmq.remoting.netty.NettyChannelImpl; import org.apache.rocketmq.remoting.netty.NettyChannelImpl;
import org.apache.rocketmq.remoting.netty.TlsHelper; import org.apache.rocketmq.remoting.netty.TlsHelper;
...@@ -74,9 +75,8 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -74,9 +75,8 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
private DefaultEventExecutorGroup defaultEventExecutorGroup; private DefaultEventExecutorGroup defaultEventExecutorGroup;
private Class<? extends ServerSocketChannel> socketChannelClass; private Class<? extends ServerSocketChannel> socketChannelClass;
private RPCHook rpcHook;
private int port = 0; private int port = 0;
private InterceptorGroup interceptorGroup;
private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";
private static final String TLS_HANDLER_NAME = "sslHandler"; private static final String TLS_HANDLER_NAME = "sslHandler";
...@@ -216,24 +216,19 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -216,24 +216,19 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
} }
@Override @Override
public void registerRPCHook(RPCHook rpcHook) { public void registerInterceptorGroup(InterceptorGroup interceptorGroup) {
this.rpcHook = rpcHook; this.interceptorGroup = interceptorGroup;
} }
@Override @Override
public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) { public void registerProcessor(int requestCode, RequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor; executor = (executor == null ? this.publicExecutor : executor);
if (null == executor) { registerNettyProcessor(requestCode, processor, executor);
executorThis = this.publicExecutor;
}
Pair<RequestProcessor, ExecutorService> pair = new Pair<RequestProcessor, ExecutorService>(processor, executorThis);
this.processorTable.put(requestCode, pair);
} }
@Override @Override
public void registerDefaultProcessor(RequestProcessor processor, ExecutorService executor) { public void registerDefaultProcessor(RequestProcessor processor, ExecutorService executor) {
this.defaultRequestProcessor = new Pair<RequestProcessor, ExecutorService>(processor, executor); this.defaultRequestProcessor = new Pair<>(processor, executor);
} }
@Override @Override
...@@ -273,8 +268,13 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements ...@@ -273,8 +268,13 @@ public class NettyRemotingServer extends NettyRemotingServerAbstract implements
} }
@Override @Override
public RPCHook getRPCHook() { public InterceptorGroup getInterceptorGroup() {
return this.rpcHook; return this.interceptorGroup;
}
@Override
protected RemotingChannel getAndCreateChannel(String addr, long timeout) throws InterruptedException {
return null;
} }
@Override @Override
......
...@@ -72,6 +72,10 @@ ...@@ -72,6 +72,10 @@
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
</dependency> </dependency>
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
...@@ -34,6 +34,9 @@ import org.apache.rocketmq.remoting.RemotingClientFactory; ...@@ -34,6 +34,9 @@ import org.apache.rocketmq.remoting.RemotingClientFactory;
import org.apache.rocketmq.remoting.RemotingServer; import org.apache.rocketmq.remoting.RemotingServer;
import org.apache.rocketmq.remoting.RemotingServerFactory; import org.apache.rocketmq.remoting.RemotingServerFactory;
import org.apache.rocketmq.remoting.ServerConfig; import org.apache.rocketmq.remoting.ServerConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.client.ClientHousekeepingService; import org.apache.rocketmq.snode.client.ClientHousekeepingService;
import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener; import org.apache.rocketmq.snode.client.ConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ConsumerManager; import org.apache.rocketmq.snode.client.ConsumerManager;
...@@ -41,9 +44,6 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener; ...@@ -41,9 +44,6 @@ import org.apache.rocketmq.snode.client.DefaultConsumerIdsChangeListener;
import org.apache.rocketmq.snode.client.ProducerManager; import org.apache.rocketmq.snode.client.ProducerManager;
import org.apache.rocketmq.snode.client.SubscriptionGroupManager; import org.apache.rocketmq.snode.client.SubscriptionGroupManager;
import org.apache.rocketmq.snode.config.SnodeConfig; import org.apache.rocketmq.snode.config.SnodeConfig;
import org.apache.rocketmq.remoting.interceptor.Interceptor;
import org.apache.rocketmq.remoting.interceptor.InterceptorFactory;
import org.apache.rocketmq.remoting.interceptor.InterceptorGroup;
import org.apache.rocketmq.snode.offset.ConsumerOffsetManager; import org.apache.rocketmq.snode.offset.ConsumerOffsetManager;
import org.apache.rocketmq.snode.processor.ConsumerManageProcessor; import org.apache.rocketmq.snode.processor.ConsumerManageProcessor;
import org.apache.rocketmq.snode.processor.HeartbeatProcessor; import org.apache.rocketmq.snode.processor.HeartbeatProcessor;
...@@ -82,7 +82,8 @@ public class SnodeController { ...@@ -82,7 +82,8 @@ public class SnodeController {
private ConsumerManageProcessor consumerManageProcessor; private ConsumerManageProcessor consumerManageProcessor;
private SendMessageProcessor sendMessageProcessor; private SendMessageProcessor sendMessageProcessor;
private PullMessageProcessor pullMessageProcessor; private PullMessageProcessor pullMessageProcessor;
private HeartbeatProcessor hearbeatProcessor; private HeartbeatProcessor heartbeatProcessor;
private InterceptorGroup remotingServerInterceptorGroup;
private InterceptorGroup consumeMessageInterceptorGroup; private InterceptorGroup consumeMessageInterceptorGroup;
private InterceptorGroup sendMessageInterceptorGroup; private InterceptorGroup sendMessageInterceptorGroup;
private PushService pushService; private PushService pushService;
...@@ -160,7 +161,7 @@ public class SnodeController { ...@@ -160,7 +161,7 @@ public class SnodeController {
this.consumerOffsetManager = new ConsumerOffsetManager(this); this.consumerOffsetManager = new ConsumerOffsetManager(this);
this.consumerManageProcessor = new ConsumerManageProcessor(this); this.consumerManageProcessor = new ConsumerManageProcessor(this);
this.sendMessageProcessor = new SendMessageProcessor(this); this.sendMessageProcessor = new SendMessageProcessor(this);
this.hearbeatProcessor = new HeartbeatProcessor(this); this.heartbeatProcessor = new HeartbeatProcessor(this);
this.pullMessageProcessor = new PullMessageProcessor(this); this.pullMessageProcessor = new PullMessageProcessor(this);
this.pushService = new PushServiceImpl(this); this.pushService = new PushServiceImpl(this);
...@@ -170,19 +171,41 @@ public class SnodeController { ...@@ -170,19 +171,41 @@ public class SnodeController {
return snodeConfig; return snodeConfig;
} }
// private void initFlowControlIntercepterGruop() {
// this.remotingServerInterceptorGroup = new InterceptorGroup();
// List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
// this.remotingServerInterceptorGroup.registerInterceptor(flowControlService);
// }
private void initRemotingServerInterceptorGroup() {
List<Interceptor> remotingServerInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getRemotingServerInterceptorPath());
if (remotingServerInterceptors != null && remotingServerInterceptors.size() > 0) {
if (this.remotingServerInterceptorGroup == null) {
this.remotingServerInterceptorGroup = new InterceptorGroup();
}
for (Interceptor interceptor : remotingServerInterceptors) {
this.remotingServerInterceptorGroup.registerInterceptor(interceptor);
log.warn("Remoting server interceptor: {} registered!", interceptor.interceptorName());
}
}
}
public boolean initialize() { public boolean initialize() {
this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService); this.snodeServer = RemotingServerFactory.getInstance().createRemotingServer().init(this.nettyServerConfig, this.clientHousekeepingService);
this.registerProcessor(); this.registerProcessor();
initInterceptorGroup(); initSnodeInterceptorGroup();
initRemotingServerInterceptorGroup();
this.snodeServer.registerInterceptorGroup(this.remotingServerInterceptorGroup);
return true; return true;
} }
private void initInterceptorGroup() { private void initSnodeInterceptorGroup() {
List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath()); List<Interceptor> consumeMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getConsumeMessageInterceptorPath());
if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) { if (consumeMessageInterceptors != null && consumeMessageInterceptors.size() > 0) {
this.consumeMessageInterceptorGroup = new InterceptorGroup(); this.consumeMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : consumeMessageInterceptors) { for (Interceptor interceptor : consumeMessageInterceptors) {
this.consumeMessageInterceptorGroup.registerInterceptor(interceptor); this.consumeMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Consume message interceptor: {} registered!", interceptor.interceptorName());
} }
} }
List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath()); List<Interceptor> sendMessageInterceptors = InterceptorFactory.getInstance().loadInterceptors(this.snodeConfig.getSendMessageInterceptorPath());
...@@ -190,15 +213,17 @@ public class SnodeController { ...@@ -190,15 +213,17 @@ public class SnodeController {
this.sendMessageInterceptorGroup = new InterceptorGroup(); this.sendMessageInterceptorGroup = new InterceptorGroup();
for (Interceptor interceptor : sendMessageInterceptors) { for (Interceptor interceptor : sendMessageInterceptors) {
this.sendMessageInterceptorGroup.registerInterceptor(interceptor); this.sendMessageInterceptorGroup.registerInterceptor(interceptor);
log.warn("Send message interceptor: {} registered!", interceptor.interceptorName());
} }
} }
} }
public void registerProcessor() { public void registerProcessor() {
this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor); this.snodeServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendMessageProcessor, this.sendMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, hearbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.HEART_BEAT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, hearbeatProcessor, this.heartbeatExecutor); this.snodeServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, heartbeatProcessor, this.heartbeatExecutor);
this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor); this.snodeServer.registerProcessor(RequestCode.SNODE_PULL_MESSAGE, pullMessageProcessor, this.pullMessageExecutor);
this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor); this.snodeServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
...@@ -287,4 +312,13 @@ public class SnodeController { ...@@ -287,4 +312,13 @@ public class SnodeController {
public void setEnodeService(EnodeService enodeService) { public void setEnodeService(EnodeService enodeService) {
this.enodeService = enodeService; this.enodeService = enodeService;
} }
public InterceptorGroup getRemotingServerInterceptorGroup() {
return remotingServerInterceptorGroup;
}
public void setRemotingServerInterceptorGroup(
InterceptorGroup remotingServerInterceptorGroup) {
this.remotingServerInterceptorGroup = remotingServerInterceptorGroup;
}
} }
...@@ -71,9 +71,13 @@ public class SnodeConfig { ...@@ -71,9 +71,13 @@ public class SnodeConfig {
private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor"; private final String consumeMessageInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.ConsumeMessageInterceptor";
private final String remotingServerInterceptorPath = "META-INF/service/org.apache.rocketmq.snode.interceptor.RemotingServerInterceptor";
private int listenPort = 11911; private int listenPort = 11911;
private double snodeQPSLimit = 10000;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true")); private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "true"));
public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) { public void setSnodeHeartBeatInterval(long snodeHeartBeatInterval) {
...@@ -276,4 +280,16 @@ public class SnodeConfig { ...@@ -276,4 +280,16 @@ public class SnodeConfig {
public String getConsumeMessageInterceptorPath() { public String getConsumeMessageInterceptorPath() {
return consumeMessageInterceptorPath; return consumeMessageInterceptorPath;
} }
public String getRemotingServerInterceptorPath() {
return remotingServerInterceptorPath;
}
public double getSnodeQPSLimit() {
return snodeQPSLimit;
}
public void setSnodeQPSLimit(double snodeQPSLimit) {
this.snodeQPSLimit = snodeQPSLimit;
}
} }
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.snode.flowcontrol;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.common.flowcontrol.AbstractFlowControlService;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingRuntimeException;
import org.apache.rocketmq.remoting.interceptor.RequestContext;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
public class QPSFlowControlServiceImpl extends AbstractFlowControlService {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
private final AtomicLong logCount = new AtomicLong(0);
private final String flowControlType = "countLimit";
public QPSFlowControlServiceImpl() {
super();
}
@Override
public String getResourceKey(RequestContext requestContext) {
if (RequestCode.HEART_BEAT == requestContext.getRequest().getCode()) {
return null;
}
return requestContext.getRequest().getCode() + "";
}
@Override
public String getFlowControlType() {
return this.flowControlType;
}
@Override
public void rejectRequest(RequestContext requestContext) {
if (logCount.getAndIncrement() % 100 == 0) {
log.warn("[REJECT]exceed system flow control config QPS, start flow control for a while: requestContext: {} ", requestContext);
}
throw new RemotingRuntimeException(RemotingSysResponseCode.SYSTEM_BUSY, "[REJECT]exceed system flow control config QPS, start flow control for a while");
}
@Override
public String interceptorName() {
return "snodeQPSFlowControlInterceptor";
}
@Override
public int getResourceCount(RequestContext requestContext) {
return 1;
}
}
...@@ -232,7 +232,7 @@ public class ConsumerOffsetManager { ...@@ -232,7 +232,7 @@ public class ConsumerOffsetManager {
for (Entry<Integer, Long> queueEntry : map.entrySet()) { for (Entry<Integer, Long> queueEntry : map.entrySet()) {
Integer queueId = queueEntry.getKey(); Integer queueId = queueEntry.getKey();
Long offset = queueEntry.getValue(); Long offset = queueEntry.getValue();
this.snodeController.getEnodeService().persistOffsetToEnode(enodeName, consumerGroup, topic, queueId, offset); this.snodeController.getEnodeService().persistOffset(enodeName, consumerGroup, topic, queueId, offset);
} }
} else { } else {
log.error("Persist offset split keys error:{}", key); log.error("Persist offset split keys error:{}", key);
......
...@@ -19,6 +19,7 @@ package org.apache.rocketmq.snode.processor; ...@@ -19,6 +19,7 @@ package org.apache.rocketmq.snode.processor;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.rocketmq.common.constant.LoggerName; import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.protocol.RequestCode; import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader; import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2; import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLogger;
...@@ -74,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor { ...@@ -74,7 +75,7 @@ public class SendMessageProcessor implements RequestProcessor {
this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext); this.snodeController.getSendMessageInterceptorGroup().afterRequest(responseContext);
} }
remotingChannel.reply(data); remotingChannel.reply(data);
if (isNeedPush) { if (data.getCode() == ResponseCode.SUCCESS && isNeedPush) {
this.snodeController.getPushService().pushMessage(topic, queueId, message, data); this.snodeController.getPushService().pushMessage(topic, queueId, message, data);
} }
} else { } else {
......
/* package org.apache.rocketmq.snode.service;/*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -14,11 +14,6 @@ ...@@ -14,11 +14,6 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service;
import com.google.common.util.concurrent.RateLimiter; public interface AdminService {
import com.google.common.util.concurrent.SmoothRateLimiter;
public class FlowControlService {
RateLimiter rateLimiter = SmoothRateLimiter.create()
} }
...@@ -44,7 +44,7 @@ public interface EnodeService { ...@@ -44,7 +44,7 @@ public interface EnodeService {
boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig); boolean persistSubscriptionGroupConfig(SubscriptionGroupConfig subscriptionGroupConfig);
void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset); void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset);
RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic, RemotingCommand loadOffset(String enodeName, String consumerGroup, String topic,
int queueId) throws InterruptedException, RemotingTimeoutException, int queueId) throws InterruptedException, RemotingTimeoutException,
......
package org.apache.rocketmq.snode.service;/* /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -14,6 +14,7 @@ package org.apache.rocketmq.snode.service;/* ...@@ -14,6 +14,7 @@ package org.apache.rocketmq.snode.service;/*
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service;
public interface MetricsService { public interface MetricsService {
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.snode.service;
import java.util.Set; import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ClusterInfo; import org.apache.rocketmq.common.protocol.body.ClusterInfo;
......
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service; package org.apache.rocketmq.snode.service;
public interface ScheduledService { public interface ScheduledService {
void startScheduleTask(); void startScheduleTask();
void shutdown(); void shutdown();
} }
...@@ -205,7 +205,7 @@ public class EnodeServiceImpl implements EnodeService { ...@@ -205,7 +205,7 @@ public class EnodeServiceImpl implements EnodeService {
} }
@Override @Override
public void persistOffsetToEnode(String enodeName, String groupName, String topic, int queueId, long offset) { public void persistOffset(String enodeName, String groupName, String topic, int queueId, long offset) {
try { try {
String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false); String address = this.snodeController.getNnodeService().getAddressByEnodeName(enodeName, false);
UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader(); UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.rocketmq.snode.service.impl; package org.apache.rocketmq.snode.service.impl;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
......
snode:
countLimit: # flow control type, only requestCount & requestSize support
- flowControlResourceName: 310
flowControlGrade: directDeny
flowControlBehavior: flowControlBehavior
flowControlResourceCount: 100000.00 #QPS
- flowControlResourceName: overall
flowControlGrade: directDeny
flowControlBehavior: flowControlBehavior
flowControlResourceCount: 100000.00 #QPS
sizeLimit:
- flowControlResourceName: 310
flowControlGrade: directDeny
flowControlBehavior: flowControlBehavior
flowControlResourceCount: 10.00 #MB/S
topicLimit:
\ No newline at end of file
org.apache.rocketmq.snode.flowcontrol.QPSFlowControlServiceImpl
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册