From 11f9bc14e679d9a1532a8721e10daac2d2c9aa1a Mon Sep 17 00:00:00 2001 From: "chao.liuc" Date: Tue, 8 May 2012 09:37:50 +0000 Subject: [PATCH] =?UTF-8?q?DUBBO-350=20=E6=B3=A8=E5=86=8C=E4=B8=AD?= =?UTF-8?q?=E5=BF=83=E5=AE=A2=E6=88=B7=E7=AB=AF=E6=9C=8D=E5=8A=A1=E5=88=97?= =?UTF-8?q?=E8=A1=A8=E9=80=9A=E7=9F=A5=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@1690 1a56cb94-b969-4eaa-88fa-be21384802f2 --- .../support/AbstractClusterInvoker.java | 12 +- .../support/FailoverClusterInvoker.java | 84 ++--- .../support/FailoverClusterInvokerTest.java | 308 ++++++++++++------ 3 files changed, 256 insertions(+), 148 deletions(-) diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java index 0551a783b..bb895faf4 100644 --- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/AbstractClusterInvoker.java @@ -216,7 +216,7 @@ public abstract class AbstractClusterInvoker implements Invoker { LoadBalance loadbalance; - List> invokers = directory.list(invocation); + List> invokers = list(invocation); if (invokers != null && invokers.size() > 0) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); @@ -230,9 +230,9 @@ public abstract class AbstractClusterInvoker implements Invoker { protected void checkWheatherDestoried() { if(destroyed){ - throw new RpcException("Rpc invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + throw new RpcException("Rpc cluster invoker for " + getInterface() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() - + " is not destroyed! Can not invoke any more."); + + " is now destroyed! Can not invoke any more."); } } @@ -255,5 +255,9 @@ public abstract class AbstractClusterInvoker implements Invoker { protected abstract Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException; - + + protected List> list(Invocation invocation) throws RpcException { + List> invokers = directory.list(invocation); + return invokers; + } } \ No newline at end of file diff --git a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvoker.java b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvoker.java index b9bb6e14b..71c7f2832 100644 --- a/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvoker.java +++ b/dubbo-cluster/src/main/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvoker.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.dubbo.rpc.cluster.support; - +package com.alibaba.dubbo.rpc.cluster.support; + import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -32,70 +32,80 @@ import com.alibaba.dubbo.rpc.RpcContext; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.cluster.Directory; import com.alibaba.dubbo.rpc.cluster.LoadBalance; - -/** - * 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。 - * + +/** + * 失败转移,当出现失败,重试其它服务器,通常用于读操作,但重试会带来更长延迟。 + * * Failover - * - * @author william.liangf - */ + * + * @author william.liangf + * @author chao.liuc + */ public class FailoverClusterInvoker extends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); - - public FailoverClusterInvoker(Directory directory) { - super(directory); - } - + + public FailoverClusterInvoker(Directory directory) { + super(directory); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) - public Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { - checkInvokers(invokers, invocation); - int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; + public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { + List> copyinvokers = invokers; + checkInvokers(copyinvokers, invocation); + int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; - } - // retry loop. - RpcException le = null; // last exception. - List> invoked = new ArrayList>(invokers.size()); // invoked invokers. - Set providers = new HashSet(len); - for (int i = 0; i < len; i++) { - Invoker invoker = select(loadbalance, invocation, invokers, invoked); - invoked.add(invoker); - RpcContext.getContext().setInvokers((List)invoked); - try { + } + // retry loop. + RpcException le = null; // last exception. + List> invoked = new ArrayList>(copyinvokers.size()); // invoked invokers. + Set providers = new HashSet(len); + for (int i = 0; i < len; i++) { + //重试时,进行重新选择,避免重试时invoker列表已发生变化. + //注意:如果列表发生了变化,那么invoked判断会失效,因为invoker示例已经改变 + if (i > 0) { + checkWheatherDestoried(); + copyinvokers = list(invocation); + //重新检查一下 + checkInvokers(copyinvokers, invocation); + } + Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); + invoked.add(invoker); + RpcContext.getContext().setInvokers((List)invoked); + try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers - + " (" + providers.size() + "/" + invokers.size() + + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } - return result; + return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; - } + } le = e; - } catch (Throwable e) { - le = new RpcException(e.getMessage(), e); + } catch (Throwable e) { + le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); - } - } + } + } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers - + " (" + providers.size() + "/" + invokers.size() + + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " - + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); + + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } - + } \ No newline at end of file diff --git a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java index e96a36e1d..f65de858c 100644 --- a/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java +++ b/dubbo-cluster/src/test/java/com/alibaba/dubbo/rpc/cluster/support/FailoverClusterInvokerTest.java @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.alibaba.dubbo.rpc.cluster.support; - +package com.alibaba.dubbo.rpc.cluster.support; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; @@ -23,6 +23,7 @@ import static org.junit.Assert.fail; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; import org.easymock.EasyMock; import org.junit.Before; @@ -36,118 +37,120 @@ import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.RpcInvocation; import com.alibaba.dubbo.rpc.RpcResult; import com.alibaba.dubbo.rpc.cluster.Directory; - -/** - * FailoverClusterInvokerTest - * @author liuchao - * - */ -@SuppressWarnings("unchecked") -public class FailoverClusterInvokerTest { - List> invokers = new ArrayList>(); - int retries = 5; - URL url = URL.valueOf("test://test:11/test?retries="+retries); - Invoker invoker1 = EasyMock.createMock(Invoker.class); - Invoker invoker2 = EasyMock.createMock(Invoker.class); - RpcInvocation invocation = new RpcInvocation(); - Directory dic ; - Result result = new RpcResult(); - /** - * @throws java.lang.Exception - */ - - @Before - public void setUp() throws Exception { - - dic = EasyMock.createMock(Directory.class); - - EasyMock.expect(dic.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(dic.list(invocation)).andReturn(invokers).anyTimes(); - EasyMock.expect(dic.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - invocation.setMethodName("method1"); - EasyMock.replay(dic); - - invokers.add(invoker1); - invokers.add(invoker2); - } - - - @Test - public void testInvokeWithRuntimeException() { - EasyMock.reset(invoker1); - EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RuntimeException()).anyTimes(); - EasyMock.expect(invoker1.isAvailable()).andReturn(true).anyTimes(); - EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker1); - - EasyMock.reset(invoker2); - EasyMock.expect(invoker2.invoke(invocation)).andThrow(new RuntimeException()).anyTimes(); - EasyMock.expect(invoker2.isAvailable()).andReturn(true).anyTimes(); - EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker2); - - FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); +import com.alibaba.dubbo.rpc.cluster.directory.StaticDirectory; +import com.alibaba.dubbo.rpc.protocol.AbstractInvoker; + +/** + * FailoverClusterInvokerTest + * @author liuchao + * + */ +@SuppressWarnings("unchecked") +public class FailoverClusterInvokerTest { + List> invokers = new ArrayList>(); + int retries = 5; + URL url = URL.valueOf("test://test:11/test?retries="+retries); + Invoker invoker1 = EasyMock.createMock(Invoker.class); + Invoker invoker2 = EasyMock.createMock(Invoker.class); + RpcInvocation invocation = new RpcInvocation(); + Directory dic ; + Result result = new RpcResult(); + /** + * @throws java.lang.Exception + */ + + @Before + public void setUp() throws Exception { + + dic = EasyMock.createMock(Directory.class); + + EasyMock.expect(dic.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(dic.list(invocation)).andReturn(invokers).anyTimes(); + EasyMock.expect(dic.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + invocation.setMethodName("method1"); + EasyMock.replay(dic); + + invokers.add(invoker1); + invokers.add(invoker2); + } + + + @Test + public void testInvokeWithRuntimeException() { + EasyMock.reset(invoker1); + EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RuntimeException()).anyTimes(); + EasyMock.expect(invoker1.isAvailable()).andReturn(true).anyTimes(); + EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker1); + + EasyMock.reset(invoker2); + EasyMock.expect(invoker2.invoke(invocation)).andThrow(new RuntimeException()).anyTimes(); + EasyMock.expect(invoker2.isAvailable()).andReturn(true).anyTimes(); + EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker2); + + FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); try { invoker.invoke(invocation); fail(); } catch (RpcException expected) { assertEquals(0,expected.getCode()); assertFalse(expected.getCause() instanceof RpcException); - } - } - - @Test() - public void testInvokeWithRPCException() { - - EasyMock.reset(invoker1); - EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RpcException()).anyTimes(); - EasyMock.expect(invoker1.isAvailable()).andReturn(true).anyTimes(); - EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker1); - - EasyMock.reset(invoker2); - EasyMock.expect(invoker2.invoke(invocation)).andReturn(result).anyTimes(); - EasyMock.expect(invoker2.isAvailable()).andReturn(true).anyTimes(); - EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker2); - - FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); - for(int i=0;i<100;i++){ - Result ret = invoker.invoke(invocation); - assertSame(result, ret); - } - } - - @Test() - public void testInvoke_retryTimes() { - - EasyMock.reset(invoker1); - EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION)).anyTimes(); - EasyMock.expect(invoker1.isAvailable()).andReturn(false).anyTimes(); - EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker1); - - EasyMock.reset(invoker2); - EasyMock.expect(invoker2.invoke(invocation)).andThrow(new RpcException()).anyTimes(); - EasyMock.expect(invoker2.isAvailable()).andReturn(false).anyTimes(); - EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); - EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); - EasyMock.replay(invoker2); - - FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); - try{ - Result ret = invoker.invoke(invocation); + } + } + + @Test() + public void testInvokeWithRPCException() { + + EasyMock.reset(invoker1); + EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RpcException()).anyTimes(); + EasyMock.expect(invoker1.isAvailable()).andReturn(true).anyTimes(); + EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker1); + + EasyMock.reset(invoker2); + EasyMock.expect(invoker2.invoke(invocation)).andReturn(result).anyTimes(); + EasyMock.expect(invoker2.isAvailable()).andReturn(true).anyTimes(); + EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker2); + + FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); + for(int i=0;i<100;i++){ + Result ret = invoker.invoke(invocation); assertSame(result, ret); - fail(); + } + } + + @Test() + public void testInvoke_retryTimes() { + + EasyMock.reset(invoker1); + EasyMock.expect(invoker1.invoke(invocation)).andThrow(new RpcException(RpcException.TIMEOUT_EXCEPTION)).anyTimes(); + EasyMock.expect(invoker1.isAvailable()).andReturn(false).anyTimes(); + EasyMock.expect(invoker1.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker1.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker1); + + EasyMock.reset(invoker2); + EasyMock.expect(invoker2.invoke(invocation)).andThrow(new RpcException()).anyTimes(); + EasyMock.expect(invoker2.isAvailable()).andReturn(false).anyTimes(); + EasyMock.expect(invoker2.getUrl()).andReturn(url).anyTimes(); + EasyMock.expect(invoker2.getInterface()).andReturn(FailoverClusterInvokerTest.class).anyTimes(); + EasyMock.replay(invoker2); + + FailoverClusterInvoker invoker = new FailoverClusterInvoker(dic); + try{ + Result ret = invoker.invoke(invocation); + assertSame(result, ret); + fail(); }catch (RpcException expected) { - assertTrue(expected.isTimeout()); - assertTrue(expected.getMessage().indexOf((retries+1)+" times")>0); - } + assertTrue(expected.isTimeout()); + assertTrue(expected.getMessage().indexOf((retries+1)+" times")>0); + } } @Test() @@ -171,5 +174,96 @@ public class FailoverClusterInvokerTest { assertFalse(expected.getCause() instanceof RpcException); } } - + + /** + * 测试在调用重试过程中,directory列表变更,invoke重试时重新进行list选择 + */ + @Test + public void testInvokerDestoryAndReList(){ + final URL url = URL.valueOf("test://localhost/"+ Demo.class.getName() + "?loadbalance=roundrobin&retries="+retries); + RpcException exception = new RpcException(RpcException.TIMEOUT_EXCEPTION); + MockInvoker invoker1 = new MockInvoker(Demo.class, url); + invoker1.setException(exception); + + MockInvoker invoker2 = new MockInvoker(Demo.class, url); + invoker2.setException(exception); + + final List> invokers = new ArrayList>(); + invokers.add(invoker1); + invokers.add(invoker2); + + Callable callable = new Callable() { + public Object call() throws Exception { + //模拟invoker全部被destroy掉 + for (Invoker invoker:invokers){ + invoker.destroy(); + } + invokers.clear(); + MockInvoker invoker3 = new MockInvoker(Demo.class, url); + invokers.add(invoker3); + return null; + } + }; + invoker1.setCallable(callable); + invoker2.setCallable(callable); + + RpcInvocation inv = new RpcInvocation(); + inv.setMethodName("test"); + + Directory dic = new MockDirectory(url, invokers); + + FailoverClusterInvoker clusterinvoker = new FailoverClusterInvoker(dic); + clusterinvoker.invoke(inv); + } + + public static interface Demo{} + + public static class MockInvoker extends AbstractInvoker { + URL url; + boolean available = true; + boolean destoryed = false; + Result result ; + RpcException exception; + Callable callable; + + public MockInvoker(Class type, URL url) { + super(type, url); + } + + public void setResult(Result result) { + this.result = result; + } + public void setException(RpcException exception) { + this.exception = exception; + } + public void setCallable(Callable callable) { + this.callable = callable; + } + + @Override + protected Result doInvoke(Invocation invocation) throws Throwable { + if (callable != null) { + try { + callable.call(); + } catch (Exception e) { + throw new RpcException(e); + } + } + if (exception != null) { + throw exception; + } else { + return result; + } + } + } + + public class MockDirectory extends StaticDirectory { + public MockDirectory(URL url , List> invokers) { + super(url, invokers); + } + @Override + protected List> doList(Invocation invocation) throws RpcException { + return new ArrayList>(super.doList(invocation)); + } + } } \ No newline at end of file -- GitLab