提交 4be3052a 编写于 作者: K kimi

DUBBO-210 重构

git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1867 1a56cb94-b969-4eaa-88fa-be21384802f2
上级 3d7e9c80
......@@ -23,33 +23,30 @@ import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;
import com.alibaba.dubbo.rpc.filter.tps.DefaultTPSLimiter;
import com.alibaba.dubbo.rpc.filter.tps.TPSLimiter;
/**
* 限制 service 或方法的 tps.
*
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
@Activate(group = Constants.PROVIDER, value = Constants.TPS_MAX_KEY)
@Activate(group = Constants.PROVIDER, value = Constants.TPS_LIMIT_RATE_KEY)
public class TpsLimitFilter implements Filter {
// TODO 现在依赖 ActiveLimitFilter 或 ExecuteLimitFilter 的计数,需要放到这两个 filter 后执行
private final TPSLimiter tpsLimiter = new DefaultTPSLimiter();
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
long max = invoker.getUrl().getMethodParameter(
invocation.getMethodName(), Constants.TPS_MAX_KEY, 0L);
// verify method tps
if (max > 0L
&& max <= RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getAverageTps()) {
if (!tpsLimiter.isAllowable(invoker.getUrl(), invocation)) {
throw new RpcException(
new StringBuilder(64)
.append("Failed to invoke service ")
.append(invoker.getInterface().getName())
.append(".")
.append(invocation.getMethodName())
.append(" because exceed max service tps ")
.append(max).toString());
.append(" because exceed max service tps.")
.toString());
}
return invoker.invoke(invocation);
......
/*
* Copyright 1999-2012 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter.tps;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class DefaultTPSLimiter implements TPSLimiter {
private final ConcurrentMap<String, StatItem> stats
= new ConcurrentHashMap<String, StatItem>();
public boolean isAllowable(URL url, Invocation invocation) {
int rate = url.getParameter(Constants.TPS_LIMIT_RATE_KEY, -1);
long interval = url.getParameter(Constants.TPS_LIMIT_INTERVAL_KEY,
Constants.DEFAULT_TPS_LIMIT_INTERVAL);
String serviceKey = url.getServiceKey();
if (rate > 0) {
StatItem statItem = stats.get(serviceKey);
if (statItem == null) {
stats.putIfAbsent(serviceKey,
new StatItem(serviceKey, rate, interval));
statItem = stats.get(serviceKey);
}
return statItem.isAllowable(url, invocation);
} else {
StatItem statItem = stats.get(serviceKey);
if (statItem != null) {
stats.remove(serviceKey);
}
}
return true;
}
}
/*
* Copyright 1999-2012 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter.tps;
import java.util.concurrent.atomic.AtomicInteger;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
class StatItem {
private String name;
private long lastResetTime;
private long interval;
private AtomicInteger token;
private int rate;
StatItem(String name, int rate, long interval) {
this.name = name;
this.rate = rate;
this.interval = interval;
this.lastResetTime = System.currentTimeMillis();
this.token = new AtomicInteger(rate);
}
public boolean isAllowable(URL url, Invocation invocation) {
long now = System.currentTimeMillis();
if (now > lastResetTime + interval) {
token.set(rate);
lastResetTime = now;
}
int value = token.get();
boolean flag = false;
while (value > 0 && !flag) {
flag = token.compareAndSet(value, value - 1);
value = token.get();
}
return flag;
}
long getLastResetTime() {
return lastResetTime;
}
int getToken() {
return token.get();
}
public String toString() {
return new StringBuilder(32).append("StatItem ")
.append("[name=").append(name).append(", ")
.append("rate = ").append(rate).append(", ")
.append("interval = ").append(interval).append("]")
.toString();
}
}
/*
* Copyright 1999-2012 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter.tps;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public interface TPSLimiter {
/**
* 根据 tps 限流规则判断是否限制此次调用.
*
* @param url url
* @param invocation invocation
* @return true 则允许调用,否则不允许
*/
boolean isAllowable(URL url, Invocation invocation);
}
......@@ -24,9 +24,11 @@ import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.RpcStatus;
import com.alibaba.dubbo.rpc.support.MockInvocation;
import com.alibaba.dubbo.rpc.support.MyInvoker;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertTrue;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
......@@ -37,7 +39,9 @@ public class TpsLimitFilterTest {
@Test
public void testWithoutCount() throws Exception {
URL url = URL.valueOf("test://test");
url = url.addParameter(Constants.TPS_MAX_KEY, 5);
url = url.addParameter(Constants.INTERFACE_KEY,
"com.alibaba.dubbo.rpc.file.TpsService");
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 5);
Invoker<TpsLimitFilterTest> invoker = new MyInvoker<TpsLimitFilterTest>(url);
Invocation invocation = new MockInvocation();
filter.invoke(invoker, invocation);
......@@ -46,15 +50,19 @@ public class TpsLimitFilterTest {
@Test(expected = RpcException.class)
public void testFail() throws Exception {
URL url = URL.valueOf("test://test");
url = url.addParameter(Constants.TPS_MAX_KEY, 5);
url = url.addParameter(Constants.INTERFACE_KEY,
"com.alibaba.dubbo.rpc.file.TpsService");
url = url.addParameter(Constants.TPS_LIMIT_RATE_KEY, 5);
Invoker<TpsLimitFilterTest> invoker = new MyInvoker<TpsLimitFilterTest>(url);
Invocation invocation = new MockInvocation();
for (int i = 0; i < 10; i++) {
RpcStatus.beginCount(url, invocation.getMethodName());
Thread.sleep(100);
RpcStatus.endCount(url, invocation.getMethodName(), 100, true);
try {
filter.invoke(invoker, invocation);
} catch (Exception e) {
assertTrue(i >= 5);
throw e;
}
}
filter.invoke(invoker, invocation);
}
}
/*
* Copyright 1999-2012 Alibaba Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.dubbo.rpc.filter.tps;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.RpcInvocation;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
/**
* @author <a href="mailto:gang.lvg@alibaba-inc.com">kimi</a>
*/
public class StatItemTest {
private StatItem statItem;
private URL url = URL.valueOf("test://localhost");
private Invocation invocation = new RpcInvocation();
@After
public void tearDown() throws Exception {
statItem = null;
}
@Test
public void testIsAllowable() throws Exception {
statItem = new StatItem("test", 5, 1000L);
long lastResetTime = statItem.getLastResetTime();
assertEquals(true, statItem.isAllowable(url, invocation));
Thread.sleep(1100L);
assertEquals(true, statItem.isAllowable(url, invocation));
assertTrue(lastResetTime != statItem.getLastResetTime());
assertEquals(4, statItem.getToken());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册