diff --git a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java index 4ad194c5e1ffc5883b978f405cde4613c22e07c7..38fce36bbe44af69d3618d4bb4afcf08ca2fa5be 100644 --- a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java +++ b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/CallbackServiceCodec.java @@ -15,33 +15,34 @@ */ package com.alibaba.dubbo.rpc.protocol.dubbo; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import com.alibaba.dubbo.common.Constants; -import com.alibaba.dubbo.common.ExtensionLoader; -import com.alibaba.dubbo.common.URL; -import com.alibaba.dubbo.common.bytecode.Wrapper; -import com.alibaba.dubbo.common.logger.Logger; -import com.alibaba.dubbo.common.logger.LoggerFactory; -import com.alibaba.dubbo.common.utils.ConcurrentHashSet; -import com.alibaba.dubbo.common.utils.StringUtils; -import com.alibaba.dubbo.remoting.Channel; -import com.alibaba.dubbo.rpc.Exporter; -import com.alibaba.dubbo.rpc.Invocation; -import com.alibaba.dubbo.rpc.Invoker; -import com.alibaba.dubbo.rpc.ProxyFactory; -import com.alibaba.dubbo.rpc.RpcConstants; -import com.alibaba.dubbo.rpc.RpcInvocation; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.alibaba.dubbo.common.Constants; +import com.alibaba.dubbo.common.ExtensionLoader; +import com.alibaba.dubbo.common.URL; +import com.alibaba.dubbo.common.bytecode.Wrapper; +import com.alibaba.dubbo.common.logger.Logger; +import com.alibaba.dubbo.common.logger.LoggerFactory; +import com.alibaba.dubbo.common.utils.ConcurrentHashSet; +import com.alibaba.dubbo.common.utils.StringUtils; +import com.alibaba.dubbo.remoting.Channel; +import com.alibaba.dubbo.remoting.RemotingException; +import com.alibaba.dubbo.rpc.Exporter; +import com.alibaba.dubbo.rpc.Invocation; +import com.alibaba.dubbo.rpc.Invoker; +import com.alibaba.dubbo.rpc.ProxyFactory; +import com.alibaba.dubbo.rpc.RpcConstants; +import com.alibaba.dubbo.rpc.RpcInvocation; /** * callback 服务帮助类. * @author chao.liuc * */ -public class CallbackServiceCodec { +class CallbackServiceCodec { private static final Logger logger = LoggerFactory.getLogger(CallbackServiceCodec.class); private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); @@ -49,7 +50,7 @@ public class CallbackServiceCodec { private static final byte CALLBACK_NONE = 0x0; private static final byte CALLBACK_CREATE = 0x1; private static final byte CALLBACK_DESTROY = 0x2; - private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-"; + private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-"; private static byte isCallBack(URL url, String methodName ,int argIndex){ //参数callback的规则是 方法名称.参数index(0开始).callback @@ -68,17 +69,16 @@ public class CallbackServiceCodec { } /** - * client 端export callback service - * @param inst - * @param clazz - * @param channel + * client 端export callback service + * @param channel + * @param clazz + * @param inst + * @param export * @param out - * @param export * @throws IOException */ @SuppressWarnings({ "unchecked", "rawtypes" }) - private static String exportOrunexportCallbackService(Channel channel, Class clazz, Object inst, Boolean export) throws IOException{ - URL url = channel.getUrl(); + private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException{ int instid = System.identityHashCode(inst); Map params = new HashMap(3); @@ -104,7 +104,7 @@ public class CallbackServiceCodec { if(export){ //同一个channel 可以有多个callback instance. 不同的instance不重新export if( ! channel.hasAttribute(cacheKey)){ - if (!isInstancesOverLimit(channel, clazz.getName(), instid, false)) { + if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) { Invoker invoker = proxyFactory.getInvoker(inst, clazz, exporturl); //资源销毁? Exporter exporter = protocol.export(invoker); @@ -126,10 +126,11 @@ public class CallbackServiceCodec { } /** - * server端 应用一个callbackservice + * server端 应用一个callbackservice + * @param url */ @SuppressWarnings("unchecked") - private static Object referOrdestroyCallbackService(Channel channel, Class clazz, Invocation inv ,int instid ,boolean isRefer){ + private static Object referOrdestroyCallbackService(Channel channel, URL url, Class clazz ,Invocation inv ,int instid, boolean isRefer){ Object proxy = null; String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid); String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid); @@ -137,9 +138,10 @@ public class CallbackServiceCodec { String countkey = getServerSideCountKey(channel, clazz.getName()); if (isRefer){ if( proxy == null ){ - if (!isInstancesOverLimit(channel, clazz.getName(), instid, true)){ - @SuppressWarnings("rawtypes") - Invoker invoker = new ChannelWrappedInvoker(clazz, channel, String.valueOf(instid)); + if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, true)){ + url = url.setPath(clazz.getName()); + @SuppressWarnings("rawtypes") + Invoker invoker = new ChannelWrappedInvoker(clazz, channel, url, String.valueOf(instid)); proxy = proxyFactory.getProxy(invoker); channel.setAttribute(proxyCacheKey, proxy); channel.setAttribute(invokerCacheKey, invoker); @@ -193,9 +195,9 @@ public class CallbackServiceCodec { private static String getServerSideCountKey(Channel channel, String interfaceClass){ return RpcConstants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+interfaceClass+".COUNT"; } - private static boolean isInstancesOverLimit(Channel channel, String interfaceClass ,int instid, boolean isServer){ + private static boolean isInstancesOverLimit(Channel channel, URL url ,String interfaceClass, int instid, boolean isServer){ Integer count = (Integer)channel.getAttribute(isServer ? getServerSideCountKey(channel,interfaceClass) : getClientSideCountKey(interfaceClass)); - int limit = channel.getUrl().getParameter(RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY, RpcConstants.DEFAULT_CALLBACK_INSTANCES); + int limit = url.getParameter(RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY, RpcConstants.DEFAULT_CALLBACK_INSTANCES); if (count != null && count >= limit){ //client side error throw new IllegalStateException("interface " + interfaceClass +" `s callback instances num exceed providers limit :"+ limit @@ -232,44 +234,53 @@ public class CallbackServiceCodec { } } - public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{ - byte callbackstatus = isCallBack(channel == null ? null : channel.getUrl(), inv.getMethodName(), paraIndex); + public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{ + //encode时可直接获取url + URL url = inv.getUrl(); + byte callbackstatus = isCallBack(channel == null ? null : url, inv.getMethodName(), paraIndex); Object[] args = inv.getArguments(); Class[] pts = inv.getParameterTypes(); switch (callbackstatus) { case CallbackServiceCodec.CALLBACK_NONE: return args[paraIndex]; case CallbackServiceCodec.CALLBACK_CREATE: - inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, pts[paraIndex], args[paraIndex], true)); + inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true)); return null; case CallbackServiceCodec.CALLBACK_DESTROY: - inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, pts[paraIndex], args[paraIndex], false)); + inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false)); return null; default: return args[paraIndex]; } } public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class[] pts, int paraIndex, Object inObject) throws IOException{ - //如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口 - byte callbackstatus = isCallBack(channel.getUrl(), inv.getMethodName(), paraIndex); + //如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口 + //decode时需要根据channel及env获取url + URL url = null ; + try { + url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl(); + } catch (RemotingException e) { + throw new IOException(StringUtils.toString("get invoker error", e)); + } + byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex); switch (callbackstatus) { case CallbackServiceCodec.CALLBACK_NONE: return inObject; case CallbackServiceCodec.CALLBACK_CREATE: try{ - return referOrdestroyCallbackService(channel, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); + return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true); }catch (Exception e) { logger.error(e); throw new IOException(StringUtils.toString(e)); } case CallbackServiceCodec.CALLBACK_DESTROY: try{ - return referOrdestroyCallbackService(channel, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); + return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false); }catch (Exception e) { throw new IOException(StringUtils.toString(e)); } default: return inObject ; } - } + } } \ No newline at end of file diff --git a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java index ee69b16a96c061a312b8ff0c7624d53b6a165e63..b2ec531602e98bdf6cc7e1980edd0e4e81a4125a 100644 --- a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java +++ b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java @@ -39,14 +39,14 @@ import com.alibaba.dubbo.rpc.protocol.AbstractInvoker; * * @author chao.liuc */ -public class ChannelWrappedInvoker extends AbstractInvoker { +class ChannelWrappedInvoker extends AbstractInvoker { private final Channel channel; private final String serviceKey ; - public ChannelWrappedInvoker(Class serviceType, Channel channel, String serviceKey) { + public ChannelWrappedInvoker(Class serviceType, Channel channel, URL url, String serviceKey) { - super(serviceType, channel.getUrl(), new String[] { Constants.GROUP_KEY, + super(serviceType, url, new String[] { Constants.GROUP_KEY, Constants.TOKEN_KEY, Constants.TIMEOUT_KEY }); this.channel = channel; this.serviceKey = serviceKey; diff --git a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 027f2c8b7ac6db11608c62b86932d2ed16186655..b85f65b8fcdae6de9dc416f8f6b8fbe2dc8035b7 100644 --- a/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc-default/src/main/java/com/alibaba/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -77,43 +77,19 @@ public class DubboProtocol extends AbstractProtocol { //consumer side export a stub service for dispatching event //servicekey-stubmethods - private final ConcurrentMap stubServiceMethodsMap = new ConcurrentHashMap(); + private final ConcurrentMap stubServiceMethodsMap = new ConcurrentHashMap(); + + private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke"; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { - private boolean isClientSide(Channel channel) { - InetSocketAddress address = channel.getRemoteAddress(); - URL url = channel.getUrl(); - return url.getPort() == address.getPort() && - NetUtils.filterLocalHost(channel.getUrl().getIp()) - .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); - } public Object reply(ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { - boolean isCallBackServiceInvoke = false; - boolean isStubServiceInvoke = false; Invocation inv = (Invocation) message; - int port = channel.getLocalAddress().getPort(); - String path = inv.getAttachments().get(Constants.PATH_KEY); - //如果是客户端的回调服务. - isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(RpcConstants.STUB_EVENT_KEY)); - if (isStubServiceInvoke){ - port = channel.getRemoteAddress().getPort(); - } - //callback - isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; - if(isCallBackServiceInvoke){ - path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(RpcConstants.CALLBACK_SERVICE_KEY); - } - String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); - - DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); - - if (exporter == null) - throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:"+message); - - if (isCallBackServiceInvoke){ - String methodsStr = exporter.getInvoker().getUrl().getParameters().get("methods"); + Invoker invoker = getInvoker(channel, inv); + //如果是callback 需要处理高版本调用低版本的问题 + if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){ + String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || methodsStr.indexOf(",") == -1){ hasMethod = inv.getMethodName().equals(methodsStr); @@ -127,12 +103,12 @@ public class DubboProtocol extends AbstractProtocol { } } if (!hasMethod){ - logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + exporter.getInvoker().getUrl()) +" ,invocation is :"+inv ); + logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv ); return null; } } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); - return exporter.getInvoker().invoke(inv); + return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } @@ -206,8 +182,46 @@ public class DubboProtocol extends AbstractProtocol { public Collection> getExporters() { return Collections.unmodifiableCollection(exporterMap.values()); - } - + } + + Map> getExporterMap(){ + return exporterMap; + } + + private boolean isClientSide(Channel channel) { + InetSocketAddress address = channel.getRemoteAddress(); + URL url = channel.getUrl(); + return url.getPort() == address.getPort() && + NetUtils.filterLocalHost(channel.getUrl().getIp()) + .equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress())); + } + + Invoker getInvoker(Channel channel, Invocation inv) throws RemotingException{ + boolean isCallBackServiceInvoke = false; + boolean isStubServiceInvoke = false; + int port = channel.getLocalAddress().getPort(); + String path = inv.getAttachments().get(Constants.PATH_KEY); + //如果是客户端的回调服务. + isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(RpcConstants.STUB_EVENT_KEY)); + if (isStubServiceInvoke){ + port = channel.getRemoteAddress().getPort(); + } + //callback + isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; + if(isCallBackServiceInvoke){ + path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(RpcConstants.CALLBACK_SERVICE_KEY); + inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); + } + String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); + + DubboExporter exporter = (DubboExporter) exporterMap.get(serviceKey); + + if (exporter == null) + throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv); + + return exporter.getInvoker(); + } + public Collection> getInvokers() { return Collections.unmodifiableCollection(invokers); } @@ -222,7 +236,7 @@ public class DubboProtocol extends AbstractProtocol { String key = url.getAddress(); //client 也可以暴露一个只有server可以调用的服务。 boolean isServer = url.getParameter(RpcConstants.IS_SERVER_KEY,true); - if (isServer && ! serverMap.containsKey(key)) { + if (isServer && ! serverMap.containsKey(key)) { serverMap.put(key, getServer(url)); } // export service. diff --git a/dubbo-rpc-default/src/test/java/com/alibaba/dubbo/rpc/protocol/dubbo/ExplicitCallbackTest.java b/dubbo-rpc-default/src/test/java/com/alibaba/dubbo/rpc/protocol/dubbo/ExplicitCallbackTest.java index 7f19c26194809209dfe73f85bc846c80c313a7b0..990776e7f1a48160b358e73cba26e5102f7f3850 100644 --- a/dubbo-rpc-default/src/test/java/com/alibaba/dubbo/rpc/protocol/dubbo/ExplicitCallbackTest.java +++ b/dubbo-rpc-default/src/test/java/com/alibaba/dubbo/rpc/protocol/dubbo/ExplicitCallbackTest.java @@ -38,7 +38,8 @@ import com.alibaba.dubbo.rpc.protocol.dubbo.support.ProtocolUtils; public class ExplicitCallbackTest { - protected Exporter exporter = null; + protected Exporter exporter = null; + protected Exporter hello_exporter = null; protected Invoker reference = null; @After @@ -46,9 +47,12 @@ public class ExplicitCallbackTest { destroyService(); } - public void exportService(){ - exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL); - } + public void exportService(){ + //先export一个service,测试共享连接的问题 + serviceURL=serviceURL.addParameter("connections", 1); +// hello_exporter = ProtocolUtils.export(new HelloServiceImpl(), IHelloService.class, "dubbo://127.0.0.1:"+serviceURL.getPort()+"/"+IHelloService.class); + exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL); + } void referService() { demoProxy = (IDemoService)ProtocolUtils.refer(IDemoService.class, consumerUrl); } @@ -65,7 +69,8 @@ public class ExplicitCallbackTest { +"&xxx.0.callback=true" +"&xxx2.0.callback=true" +"&unxxx2.0.callback=false" - +"&timeout="+timeout + +"&timeout="+timeout + +"&retries=0" +"&"+RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY+"="+callbacks ); // uncomment is unblock invoking @@ -86,7 +91,8 @@ public class ExplicitCallbackTest { public void destroyService(){ demoProxy = null ; try { - if (exporter!=null) exporter.unexport(); + if (exporter!=null) exporter.unexport(); + if (hello_exporter!=null) hello_exporter.unexport(); if (reference!=null) reference.destroy(); }catch (Exception e) { } @@ -94,15 +100,25 @@ public class ExplicitCallbackTest { // ============================华丽的分割线================================================ interface IDemoCallback{ String yyy(String msg); - } + } + interface IHelloService{ + public String sayHello(); + } + interface IDemoService{ public String get(); public int getCallbackCount(); public void xxx(IDemoCallback callback,String arg1,int runs,int sleep); public void xxx2(IDemoCallback callback); public void unxxx2(IDemoCallback callback); + } + + class HelloServiceImpl implements IHelloService{ + public String sayHello() { + return "hello"; + } + } - class DemoServiceImpl implements IDemoService { public String get(){ return "ok" ; @@ -183,8 +199,9 @@ public class ExplicitCallbackTest { // ============================华丽的分割线================================================ IDemoService demoProxy = null; @Test - public void TestCallbackNormal() throws Exception { - initOrResetUrl(1, 1000); initOrResetService() ; + public void TestCallbackNormal() throws Exception { + + initOrResetUrl(1, 10000000); initOrResetService() ; final AtomicInteger count = new AtomicInteger(0); demoProxy.xxx(new IDemoCallback() { @@ -194,9 +211,12 @@ public class ExplicitCallbackTest { return "ok"; } },"other custom args" , 10 , 100); - System.out.println("Async..."); + System.out.println("Async..."); +// Thread.sleep(10000000); assertCallbackCount(10,100,count); - destroyService(); + destroyService(); + + } @Test