提交 26d2f181 编写于 作者: Y YunaiV

TpsLimitFilter

上级 2d3885d3
......@@ -94,7 +94,7 @@ public class MockClusterInvoker<T> implements Invoker<T> {
@SuppressWarnings({"unchecked", "rawtypes"})
private Result doMockInvoke(Invocation invocation, RpcException e) {
Result result = null;
Result result;
Invoker<T> minvoker;
List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
......
......@@ -608,10 +608,19 @@ public class Constants {
public static final String RELIABLE_PROTOCOL = "napoli";
/**
* TPS 限制 - 大小配置项
*
* TpsLimitFilter
*/
public static final String TPS_LIMIT_RATE_KEY = "tps";
/**
* TPS 限制 - 周期配置项
*/
public static final String TPS_LIMIT_INTERVAL_KEY = "tps.interval";
/**
* TPS 限制 - 周期默认值
*/
public static final long DEFAULT_TPS_LIMIT_INTERVAL = 60 * 1000;
public static final String DECODE_IN_IO_THREAD_KEY = "decode.in.io";
......
......@@ -209,17 +209,21 @@ public final class ReflectUtils {
}
public static String getCodeBase(Class<?> cls) {
if (cls == null)
if (cls == null) {
return null;
}
ProtectionDomain domain = cls.getProtectionDomain();
if (domain == null)
if (domain == null) {
return null;
}
CodeSource source = domain.getCodeSource();
if (source == null)
if (source == null) {
return null;
}
URL location = source.getLocation();
if (location == null)
if (location == null) {
return null;
}
return location.getFile();
}
......
......@@ -354,6 +354,7 @@ public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
* @param interfaceClass 接口类
*/
protected void checkStubAndMock(Class<?> interfaceClass) {
// `local` 配置项的校验,和 `stub` 一样。
if (ConfigUtils.isNotEmpty(local)) {
Class<?> localClass = ConfigUtils.isDefault(local) ? ReflectUtils.forName(interfaceClass.getName() + "Local") : ReflectUtils.forName(local);
if (!interfaceClass.isAssignableFrom(localClass)) {
......@@ -365,11 +366,15 @@ public abstract class AbstractInterfaceConfig extends AbstractMethodConfig {
throw new IllegalStateException("No such constructor \"public " + localClass.getSimpleName() + "(" + interfaceClass.getName() + ")\" in local implementation class " + localClass.getName());
}
}
// `stub` 配置项的校验
if (ConfigUtils.isNotEmpty(stub)) {
// `stub = true` 的情况,使用接口 + `Stub` 字符串。
Class<?> localClass = ConfigUtils.isDefault(stub) ? ReflectUtils.forName(interfaceClass.getName() + "Stub") : ReflectUtils.forName(stub);
// Stub 类,必须实现服务接口
if (!interfaceClass.isAssignableFrom(localClass)) {
throw new IllegalStateException("The local implementation class " + localClass.getName() + " not implement interface " + interfaceClass.getName());
}
// Stub 类,必须带有服务接口的构造方法
try {
ReflectUtils.findConstructor(localClass, interfaceClass);
} catch (NoSuchMethodException e) {
......
......@@ -362,7 +362,7 @@ public class ReferenceConfig<T> extends AbstractReferenceConfig {
map.put("revision", revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 【TODO 8003】Wrapper
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 获得方法数组
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
......
......@@ -411,7 +411,7 @@ public class ServiceConfig<T> extends AbstractServiceConfig {
if (path == null || path.length() == 0) {
path = interfaceName;
}
// 暴露
// 暴露服务
doExportUrls();
// TODO 芋艿,等待 qos
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
......@@ -568,7 +568,7 @@ public class ServiceConfig<T> extends AbstractServiceConfig {
map.put("revision", revision); // 修订本
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 【TODO 8003】Wrapper
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); // 获得方法数组
if (methods.length == 0) {
logger.warn("NO method found in service interface " + interfaceClass.getName());
map.put("methods", Constants.ANY_VALUE);
......@@ -578,7 +578,7 @@ public class ServiceConfig<T> extends AbstractServiceConfig {
}
// token ,参见《令牌校验》https://dubbo.gitbooks.io/dubbo-user-book/demos/token-authorization.html
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
if (ConfigUtils.isDefault(token)) { // true || default 时,UUID 随机生成
map.put("token", UUID.randomUUID().toString());
} else {
map.put("token", token);
......
package com.alibaba.dubbo.demo;
public class TestException extends RuntimeException {
}
......@@ -41,11 +41,11 @@ public class Consumer {
// System.out.println(hello); // get result
demoService.say01(null);
demoService.say01("hahha");
demoService.say02();
demoService.say03();
demoService.say04();
// demoService.say01(null);
demoService.say01("TestException");
// demoService.say02();
// demoService.say03();
// demoService.say04();
// 参数回调
// https://dubbo.gitbooks.io/dubbo-user-book/demos/callback-parameter.html
......
package com.alibaba.dubbo.demo.consumer;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.demo.ParamCallback;
public class DemoServiceStub implements DemoService {
private DemoService demoService;
public DemoServiceStub(DemoService demoService) {
this.demoService = demoService;
}
@Override
public String sayHello(String name) {
return null;
}
@Override
public void bye(Object o) {
System.out.println("o");
}
@Override
public void callbackParam(String msg, ParamCallback callback) {
System.out.println("o");
}
@Override
public String say01(String msg) {
return null;
}
@Override
public String[] say02() {
return new String[0];
}
@Override
public void say03() {
System.out.println("o");
}
@Override
public Void say04() {
return null;
}
}
......@@ -29,12 +29,12 @@ limitations under the License.
<!-- use multicast registry center to discover service -->
<!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181"/>-->
<dubbo:registry address="redis://127.0.0.1:6379"/>
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!--<dubbo:registry address="redis://127.0.0.1:6379"/>-->
<!-- generate proxy for the remote service, then demoService can be used in the same way as the
local regular interface -->
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" client="netty4" timeout="100000" callbacks="1000">
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" client="netty4" timeout="100000" callbacks="1000" >
<!--<dubbo:method name="sayHello" />-->
<!--url="registry://127.0.0.1:2181?registry=zookeeper;127.0.0.1:2181;127.0.0.1:27018"-->
......@@ -45,5 +45,6 @@ limitations under the License.
</dubbo:reference>
<!--stub="com.alibaba.dubbo.demo.consumer.DemoServiceStub"-->
</beans>
\ No newline at end of file
......@@ -17,6 +17,8 @@
package com.alibaba.dubbo.demo.consumer;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.demo.ParamCallback;
import com.alibaba.dubbo.demo.TestException;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.fastjson.JSON;
......@@ -36,6 +38,37 @@ public class DemoServiceImpl implements DemoService {
System.out.println(o.getClass());
}
@Override
public void callbackParam(String msg, ParamCallback callback) {
}
@Override
public String say01(String msg) {
if ("RuntimeException".equalsIgnoreCase(msg)) {
throw new RuntimeException("123");
}
if ("TestException".equalsIgnoreCase(msg)) {
throw new TestException();
}
return null;
}
@Override
public String[] say02() {
return new String[0];
}
@Override
public void say03() {
}
@Override
public Void say04() {
return null;
}
// public String getTest01() {
// return test01;
// }
......
......@@ -32,7 +32,7 @@ public class InjvmConsumer {
while (true) {
try {
Thread.sleep(1000);
String hello = demoService.sayHello("world"); // call remote method
String hello = demoService.say01("RuntimeException"); // call remote method
System.out.println(hello); // get result
// demoService.bye(new Cat().setName("小猫"));
......
tps=com.alibaba.dubbo.rpc.filter.TpsLimitFilter
\ No newline at end of file
......@@ -30,6 +30,13 @@ limitations under the License.
</dubbo:reference>
<bean id="demoServiceImpl" class="com.alibaba.dubbo.demo.consumer.DemoServiceImpl"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" />
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoServiceImpl" protocol="injvm" >
<dubbo:parameter key="tps" value="100" />
</dubbo:service>
<dubbo:provider>
</dubbo:provider>
</beans>
\ No newline at end of file
package com.alibaba.dubbo.demo.provider;
import com.alibaba.dubbo.demo.DemoService;
import com.alibaba.dubbo.demo.ParamCallback;
public class DemoServiceStub implements DemoService {
private DemoService demoService;
public DemoServiceStub(DemoService demoService) {
this.demoService = demoService;
}
@Override
public String sayHello(String name) {
return null;
}
@Override
public void bye(Object o) {
System.out.println("o");
}
@Override
public void callbackParam(String msg, ParamCallback callback) {
System.out.println("o");
}
@Override
public String say01(String msg) {
return null;
}
@Override
public String[] say02() {
return new String[0];
}
@Override
public void say03() {
System.out.println("o");
}
@Override
public Void say04() {
return null;
}
}
......@@ -32,8 +32,8 @@ limitations under the License.
<!--<dubbo:registry address="multicast://224.5.6.7:1234"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181||10.20.153.11:2181,10.20.153.12:2181"/>-->
<!--<dubbo:registry address="zookeeper://127.0.0.1:2181" />-->
<dubbo:registry address="redis://127.0.0.1:6379" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<!--<dubbo:registry address="redis://127.0.0.1:6379" />-->
<bean id="demoDAO" class="com.alibaba.dubbo.demo.provider.DemoDAO" />
......@@ -41,10 +41,10 @@ limitations under the License.
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<bean id="genericService" class="com.alibaba.dubbo.demo.provider.MyGenericService" />
<!--<bean id="genericService" class="com.alibaba.dubbo.demo.provider.MyGenericService" />-->
<!-- declare the service interface to be exported -->
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" filter="demo" deprecated="true" callbacks="1000" timeout="200000">
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" filter="demo" deprecated="false" callbacks="1000" timeout="200000" accesslog="true">
<!--<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" protocol="dubbo" ref="demoService"/>-->
<!--<dubbo:service id="sa" interface="com.alibaba.dubbo.demo.DemoService" protocol="dubbo" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl">-->
<!--&lt;!&ndash;<dubbo:method name="sayHello" retries="100">&ndash;&gt;-->
......@@ -60,20 +60,22 @@ limitations under the License.
<!--&lt;!&ndash;<dubbo:argument type="com.demo.CallbackListener" callback="true" />&ndash;&gt;-->
<!--&lt;!&ndash;</dubbo:method>&ndash;&gt;-->
<dubbo:parameter key="threadname" value="shuaiqi" />
<dubbo:parameter key="threads" value="123" />
<dubbo:parameter key="queues" value="10" />
<!--<dubbo:parameter key="threadname" value="shuaiqi" />-->
<!--<dubbo:parameter key="threads" value="123" />-->
<!--<dubbo:parameter key="queues" value="10" />-->
<!--<dubbo:method name="callbackParam">-->
<!--<dubbo:argument callback="true" type="com.alibaba.dubbo.demo.ParamCallback" />-->
<!--</dubbo:method>-->
<dubbo:method name="say01" deprecated="true" />
<dubbo:method name="callbackParam">
<dubbo:argument callback="true" type="com.alibaba.dubbo.demo.ParamCallback" />
</dubbo:method>
</dubbo:service>
<dubbo:service interface="com.alibaba.dubbo.demo.BarService" ref="genericService" timeout="200000" />
<!--<dubbo:service interface="com.alibaba.dubbo.demo.BarService" ref="genericService" timeout="200000" />-->
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20880" server="netty4" />
<dubbo:protocol accesslog="true" name="dubbo" port="20880" server="netty4" />
<!--<dubbo:protocol id="pb" name="dubbo" port="20881"/>-->
</beans>
\ No newline at end of file
......@@ -33,7 +33,7 @@ public class RegistryStatusChecker implements StatusChecker {
public Status check() {
Collection<Registry> regsitries = AbstractRegistryFactory.getRegistries();
if (regsitries == null || regsitries.isEmpty()) {
if (regsitries.isEmpty()) {
return new Status(Status.Level.UNKNOWN);
}
Status.Level level = Status.Level.OK;
......
......@@ -26,13 +26,14 @@ package com.alibaba.dubbo.rpc;
*/
public final class RpcException extends RuntimeException {
private static final long serialVersionUID = 7815426752583648734L;
public static final int UNKNOWN_EXCEPTION = 0;
public static final int NETWORK_EXCEPTION = 1;
public static final int TIMEOUT_EXCEPTION = 2;
public static final int BIZ_EXCEPTION = 3;
public static final int FORBIDDEN_EXCEPTION = 4;
public static final int SERIALIZATION_EXCEPTION = 5;
private static final long serialVersionUID = 7815426752583648734L;
private int code; // RpcException cannot be extended, use error code for exception type to keep compatibility
public RpcException() {
......
......@@ -28,41 +28,91 @@ import java.util.concurrent.atomic.AtomicLong;
/**
* URL statistics. (API, Cached, ThreadSafe)
*
* RPC 状态。可以计入如下维度统计:
*
* 1. 基于服务 URL ,{@link #SERVICE_STATISTICS}
* 2. 基于服务 URL + 方法,{@link #METHOD_STATISTICS}
*
* @see com.alibaba.dubbo.rpc.filter.ActiveLimitFilter
* @see com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter
* @see com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
* @see .com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
*/
public class RpcStatus {
/**
* 基于服务 URL 为维度的 RpcStatus 集合
*
* key:URL
*/
private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();
/**
* 基于服务 URL + 方法维度的 RpcStatus 集合
*
* key1:URL
* key2:方法名
*/
private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
// 目前没有用到
private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
/**
* 调用中的次数
*/
private final AtomicInteger active = new AtomicInteger();
/**
* 总调用次数
*/
private final AtomicLong total = new AtomicLong();
/**
* 总调用失败次数
*/
private final AtomicInteger failed = new AtomicInteger();
/**
* 总调用时长,单位:毫秒
*/
private final AtomicLong totalElapsed = new AtomicLong();
/**
* 总调用失败时长,单位:毫秒
*/
private final AtomicLong failedElapsed = new AtomicLong();
/**
* 最大调用时长,单位:毫秒
*/
private final AtomicLong maxElapsed = new AtomicLong();
/**
* 最大调用失败时长,单位:毫秒
*/
private final AtomicLong failedMaxElapsed = new AtomicLong();
/**
* 最大调用成功时长,单位:毫秒
*/
private final AtomicLong succeededMaxElapsed = new AtomicLong();
/**
* Semaphore used to control concurrency limit set by `executes`
*
* 服务执行信号量,在 {@link com.alibaba.dubbo.rpc.filter.ExecuteLimitFilter} 中使用
*/
private volatile Semaphore executesLimit;
/**
* 服务执行信号量大小
*/
private volatile int executesPermits;
private RpcStatus() {
}
/**
* @param url
* 获得 RpcStatus 对象
*
* @param url URL
* @return status
*/
public static RpcStatus getStatus(URL url) {
String uri = url.toIdentityString();
// 获得
RpcStatus status = SERVICE_STATISTICS.get(uri);
// 不存在,则进行创建
if (status == null) {
SERVICE_STATISTICS.putIfAbsent(uri, new RpcStatus());
status = SERVICE_STATISTICS.get(uri);
......@@ -79,18 +129,25 @@ public class RpcStatus {
}
/**
* @param url
* @param methodName
* 获得 RpcStatus 对象
*
* @param url URL
* @param methodName 方法
* @return status
*/
public static RpcStatus getStatus(URL url, String methodName) {
String uri = url.toIdentityString();
// 获得方法集合
ConcurrentMap<String, RpcStatus> map = METHOD_STATISTICS.get(uri);
// 不存在,创建方法集合
if (map == null) {
METHOD_STATISTICS.putIfAbsent(uri, new ConcurrentHashMap<String, RpcStatus>());
map = METHOD_STATISTICS.get(uri);
}
// 获得 RpcStatus 对象
RpcStatus status = map.get(methodName);
// 不存在,创建 RpcStatus 对象
if (status == null) {
map.putIfAbsent(methodName, new RpcStatus());
status = map.get(methodName);
......@@ -110,31 +167,43 @@ public class RpcStatus {
}
/**
* @param url
* 服务调用开始的计数
*
* @param url URL 对象
* @param methodName 方法名
*/
public static void beginCount(URL url, String methodName) {
// `SERVICE_STATISTICS` 的计数
beginCount(getStatus(url));
// `METHOD_STATISTICS` 的计数
beginCount(getStatus(url, methodName));
}
private static void beginCount(RpcStatus status) {
// 调用中的次数
status.active.incrementAndGet();
}
/**
* @param url
* @param elapsed
* @param succeeded
* 服务调用结束的计数
*
* @param url URL 对象
* @param elapsed 时长,毫秒
* @param succeeded 是否成功
*/
public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
// `SERVICE_STATISTICS` 的计数
endCount(getStatus(url), elapsed, succeeded);
// `METHOD_STATISTICS` 的计数
endCount(getStatus(url, methodName), elapsed, succeeded);
}
private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
// 次数计数
status.active.decrementAndGet();
status.total.incrementAndGet();
status.totalElapsed.addAndGet(elapsed);
// 时长计数
if (status.maxElapsed.get() < elapsed) {
status.maxElapsed.set(elapsed);
}
......@@ -143,7 +212,7 @@ public class RpcStatus {
status.succeededMaxElapsed.set(elapsed);
}
} else {
status.failed.incrementAndGet();
status.failed.incrementAndGet(); // 失败次数
status.failedElapsed.addAndGet(elapsed);
if (status.failedMaxElapsed.get() < elapsed) {
status.failedMaxElapsed.set(elapsed);
......@@ -322,7 +391,7 @@ public class RpcStatus {
if(maxThreadNum <= 0) {
return null;
}
// 若信号量不存在,或者信号量大小改变,创建新的信号量
if (executesLimit == null || executesPermits != maxThreadNum) {
synchronized (this) {
if (executesLimit == null || executesPermits != maxThreadNum) {
......@@ -331,7 +400,8 @@ public class RpcStatus {
}
}
}
// 返回信号量
return executesLimit;
}
}
\ No newline at end of file
......@@ -31,21 +31,26 @@ import java.util.Map;
/**
* TokenInvokerFilter
*
* 令牌验证 Filter
*/
@Activate(group = Constants.PROVIDER, value = Constants.TOKEN_KEY)
public class TokenFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation inv)
throws RpcException {
@Override
public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {
// 获得服务提供者配置的 Token 值
String token = invoker.getUrl().getParameter(Constants.TOKEN_KEY);
if (ConfigUtils.isNotEmpty(token)) {
// 从隐式参数中,获得 Token 值。
Class<?> serviceType = invoker.getInterface();
Map<String, String> attachments = inv.getAttachments();
String remoteToken = attachments == null ? null : attachments.get(Constants.TOKEN_KEY);
// 对比,若不一致,抛出 RpcException 异常
if (!token.equals(remoteToken)) {
throw new RpcException("Invalid token! Forbid invoke remote service " + serviceType + " method " + inv.getMethodName() + "() from consumer " + RpcContext.getContext().getRemoteHost() + " to provider " + RpcContext.getContext().getLocalHost());
}
}
// 服务调用
return invoker.invoke(inv);
}
......
......@@ -30,14 +30,15 @@ import com.alibaba.dubbo.rpc.filter.tps.TPSLimiter;
/**
* Limit TPS for either service or service's particular method
*
* TPS 限流过滤器实现类
*/
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
new StringBuilder(64)
......@@ -48,8 +49,8 @@ public class TpsLimitFilter implements Filter {
.append(" because exceed max service tps.")
.toString());
}
// 服务调用
return invoker.invoke(invocation);
}
}
}
\ No newline at end of file
......@@ -23,32 +23,46 @@ import com.alibaba.dubbo.rpc.Invocation;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 默认 TPS 限制器实现类
*/
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
/**
* StatItem 集合
*
* key:服务名
*/
private final ConcurrentMap<String, StatItem> stats = new ConcurrentHashMap<String, StatItem>();
@Override
public boolean isAllowable(URL url, Invocation invocation) {
// 获得 TPS 大小配置项
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
// 获得 TPS 周期配置项,默认 60 秒
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY, Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
// 要限流
if (rate > 0) {
// 获得 StatItem 对象
StatItem statItem = stats.get(serviceKey);
// 不存在,则进行创建
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
// 根据 TPS 限流规则判断是否限制此次调用.
return statItem.isAllowable(url, invocation);
// 不限流
} else {
// 移除 StatItem
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
// 返回通过
return true;
}
return true;
}
}
......@@ -21,17 +21,31 @@ import com.alibaba.dubbo.rpc.Invocation;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 统计项
*/
class StatItem {
/**
* 统计名,目前使用服务名
*/
private String name;
private long lastResetTime;
/**
* 周期
*/
private long interval;
private AtomicInteger token;
/**
* 限制大小
*/
private int rate;
/**
* 最后重置时间
*/
private long lastResetTime;
/**
* 当前周期,剩余种子数
*/
private AtomicInteger token;
StatItem(String name, int rate, long interval) {
this.name = name;
......@@ -42,12 +56,14 @@ class StatItem {
}
public boolean isAllowable(URL url, Invocation invocation) {
// 若到达下一个周期,恢复可用种子数,设置最后重置时间。
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
token.set(rate); // 回复可用种子数
lastResetTime = now; // 最后重置时间
}
// CAS ,直到或得到一个种子,或者没有足够种子
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
......@@ -55,6 +71,7 @@ class StatItem {
value = token.get();
}
// 是否成功
return flag;
}
......
......@@ -19,11 +19,16 @@ package com.alibaba.dubbo.rpc.filter.tps;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
/**
* TPS 限制器接口
*/
public interface TPSLimiter {
/**
* judge if the current invocation is allowed by TPS rule
*
* 根据 tps 限流规则判断是否限制此次调用.
*
* @param url url
* @param invocation invocation
* @return true allow the current invocation, otherwise, return false
......
......@@ -137,6 +137,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
return getInterface() + " -> " + (getUrl() == null ? "" : getUrl().toString());
}
@Override
public Result invoke(Invocation inv) throws RpcException {
if (destroyed.get()) {
throw new RpcException("Rpc invoker for service " + this + " on consumer " + NetUtils.getLocalHost()
......@@ -150,7 +151,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
// 添加自定义的隐士传参
// 添加自定义的隐式参数
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
......
......@@ -65,22 +65,27 @@ public class ProtocolFilterWrapper implements Protocol {
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
......
......@@ -37,13 +37,20 @@ import java.lang.reflect.Constructor;
/**
* StubProxyFactoryWrapper
*
* 存根代理工厂包装器实现类
*/
public class StubProxyFactoryWrapper implements ProxyFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(StubProxyFactoryWrapper.class);
/**
* ProxyFactory$Adaptive 对象
*/
private final ProxyFactory proxyFactory;
/**
* Protocol$Adaptive 对象
*/
private Protocol protocol;
public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
......@@ -54,13 +61,17 @@ public class StubProxyFactoryWrapper implements ProxyFactory {
this.protocol = protocol;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 获得 Service Proxy 对象
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) {
if (GenericService.class != invoker.getInterface()) { // 非泛化引用
// 获得 `stub` 配置项
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
// `stub = true` 的情况,使用接口 + `Stub` 字符串。
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
......@@ -69,13 +80,17 @@ public class StubProxyFactoryWrapper implements ProxyFactory {
}
}
try {
// 加载 Stub 类
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
}
try {
// 创建 Stub 对象,使用带 Service Proxy 对象的构造方法
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
proxy = (T) constructor.newInstance(new Object[]{proxy});
// 【TODO 8033】参数回调
//export stub service
URL url = invoker.getUrl();
if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
......@@ -99,6 +114,7 @@ public class StubProxyFactoryWrapper implements ProxyFactory {
return proxy;
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return proxyFactory.getInvoker(proxy, type, url);
}
......
......@@ -22,6 +22,7 @@ import com.alibaba.dubbo.rpc.Invoker;
/**
* DelegateExporter
*/
@Deprecated // add by 芋艿,并未使用
public class DelegateExporter<T> implements Exporter<T> {
private final Exporter<T> exporter;
......
......@@ -25,6 +25,7 @@ import com.alibaba.dubbo.rpc.RpcException;
/**
* DelegateInvoker
*/
@Deprecated // add by 芋艿,并未使用
public abstract class DelegateInvoker<T> implements Invoker<T> {
protected final Invoker<T> invoker;
......
......@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
final public class MockInvoker<T> implements Invoker<T> {
private final static ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private final static Map<String, Invoker<?>> mocks = new ConcurrentHashMap<String, Invoker<?>>();
private final static Map<String, Throwable> throwables = new ConcurrentHashMap<String, Throwable>();
......@@ -83,6 +84,7 @@ final public class MockInvoker<T> implements Invoker<T> {
return value;
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
if (invocation instanceof RpcInvocation) {
......@@ -217,4 +219,5 @@ final public class MockInvoker<T> implements Invoker<T> {
//FIXME
return null;
}
}
\ No newline at end of file
......@@ -31,11 +31,14 @@ final public class MockProtocol extends AbstractProtocol {
return 0;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
throw new UnsupportedOperationException();
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new MockInvoker<T>(url);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册