提交 6cec6672 编写于 作者: C chao.liuc

DUBBO-200 Callback服务获取url方式及path设置问题修复

git-svn-id: http://code.alibabatech.com/svn/dubbo/trunk@891 1a56cb94-b969-4eaa-88fa-be21384802f2
上级 9768d9a9
......@@ -15,33 +15,34 @@
*/
package com.alibaba.dubbo.rpc.protocol.dubbo;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.ExtensionLoader;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcConstants;
import com.alibaba.dubbo.rpc.RpcInvocation;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.ExtensionLoader;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.common.utils.ConcurrentHashSet;
import com.alibaba.dubbo.common.utils.StringUtils;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.ProxyFactory;
import com.alibaba.dubbo.rpc.RpcConstants;
import com.alibaba.dubbo.rpc.RpcInvocation;
/**
* callback 服务帮助类.
* @author chao.liuc
*
*/
public class CallbackServiceCodec {
class CallbackServiceCodec {
private static final Logger logger = LoggerFactory.getLogger(CallbackServiceCodec.class);
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
......@@ -49,7 +50,7 @@ public class CallbackServiceCodec {
private static final byte CALLBACK_NONE = 0x0;
private static final byte CALLBACK_CREATE = 0x1;
private static final byte CALLBACK_DESTROY = 0x2;
private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
private static final String INV_ATT_CALLBACK_KEY = "sys_callback_arg-";
private static byte isCallBack(URL url, String methodName ,int argIndex){
//参数callback的规则是 方法名称.参数index(0开始).callback
......@@ -68,17 +69,16 @@ public class CallbackServiceCodec {
}
/**
* client 端export callback service
* @param inst
* @param clazz
* @param channel
* client 端export callback service
* @param channel
* @param clazz
* @param inst
* @param export
* @param out
* @param export
* @throws IOException
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
private static String exportOrunexportCallbackService(Channel channel, Class clazz, Object inst, Boolean export) throws IOException{
URL url = channel.getUrl();
private static String exportOrunexportCallbackService(Channel channel, URL url, Class clazz, Object inst, Boolean export) throws IOException{
int instid = System.identityHashCode(inst);
Map<String,String> params = new HashMap<String,String>(3);
......@@ -104,7 +104,7 @@ public class CallbackServiceCodec {
if(export){
//同一个channel 可以有多个callback instance. 不同的instance不重新export
if( ! channel.hasAttribute(cacheKey)){
if (!isInstancesOverLimit(channel, clazz.getName(), instid, false)) {
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, false)) {
Invoker<?> invoker = proxyFactory.getInvoker(inst, clazz, exporturl);
//资源销毁?
Exporter<?> exporter = protocol.export(invoker);
......@@ -126,10 +126,11 @@ public class CallbackServiceCodec {
}
/**
* server端 应用一个callbackservice
* server端 应用一个callbackservice
* @param url
*/
@SuppressWarnings("unchecked")
private static Object referOrdestroyCallbackService(Channel channel, Class<?> clazz, Invocation inv ,int instid ,boolean isRefer){
private static Object referOrdestroyCallbackService(Channel channel, URL url, Class<?> clazz ,Invocation inv ,int instid, boolean isRefer){
Object proxy = null;
String invokerCacheKey = getServerSideCallbackInvokerCacheKey(channel, clazz.getName(), instid);
String proxyCacheKey = getServerSideCallbackServiceCacheKey(channel, clazz.getName(), instid);
......@@ -137,9 +138,10 @@ public class CallbackServiceCodec {
String countkey = getServerSideCountKey(channel, clazz.getName());
if (isRefer){
if( proxy == null ){
if (!isInstancesOverLimit(channel, clazz.getName(), instid, true)){
@SuppressWarnings("rawtypes")
Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, String.valueOf(instid));
if (!isInstancesOverLimit(channel, url, clazz.getName(), instid, true)){
url = url.setPath(clazz.getName());
@SuppressWarnings("rawtypes")
Invoker<?> invoker = new ChannelWrappedInvoker(clazz, channel, url, String.valueOf(instid));
proxy = proxyFactory.getProxy(invoker);
channel.setAttribute(proxyCacheKey, proxy);
channel.setAttribute(invokerCacheKey, invoker);
......@@ -193,9 +195,9 @@ public class CallbackServiceCodec {
private static String getServerSideCountKey(Channel channel, String interfaceClass){
return RpcConstants.CALLBACK_SERVICE_PROXY_KEY+"."+System.identityHashCode(channel)+"."+interfaceClass+".COUNT";
}
private static boolean isInstancesOverLimit(Channel channel, String interfaceClass ,int instid, boolean isServer){
private static boolean isInstancesOverLimit(Channel channel, URL url ,String interfaceClass, int instid, boolean isServer){
Integer count = (Integer)channel.getAttribute(isServer ? getServerSideCountKey(channel,interfaceClass) : getClientSideCountKey(interfaceClass));
int limit = channel.getUrl().getParameter(RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY, RpcConstants.DEFAULT_CALLBACK_INSTANCES);
int limit = url.getParameter(RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY, RpcConstants.DEFAULT_CALLBACK_INSTANCES);
if (count != null && count >= limit){
//client side error
throw new IllegalStateException("interface " + interfaceClass +" `s callback instances num exceed providers limit :"+ limit
......@@ -232,44 +234,53 @@ public class CallbackServiceCodec {
}
}
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{
byte callbackstatus = isCallBack(channel == null ? null : channel.getUrl(), inv.getMethodName(), paraIndex);
public static Object encodeInvocationArgument(Channel channel, RpcInvocation inv, int paraIndex) throws IOException{
//encode时可直接获取url
URL url = inv.getUrl();
byte callbackstatus = isCallBack(channel == null ? null : url, inv.getMethodName(), paraIndex);
Object[] args = inv.getArguments();
Class<?>[] pts = inv.getParameterTypes();
switch (callbackstatus) {
case CallbackServiceCodec.CALLBACK_NONE:
return args[paraIndex];
case CallbackServiceCodec.CALLBACK_CREATE:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, pts[paraIndex], args[paraIndex], true));
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex , exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], true));
return null;
case CallbackServiceCodec.CALLBACK_DESTROY:
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, pts[paraIndex], args[paraIndex], false));
inv.setAttachment(INV_ATT_CALLBACK_KEY + paraIndex, exportOrunexportCallbackService(channel, url, pts[paraIndex], args[paraIndex], false));
return null;
default:
return args[paraIndex];
}
}
public static Object decodeInvocationArgument(Channel channel, RpcInvocation inv, Class<?>[] pts, int paraIndex, Object inObject) throws IOException{
//如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口
byte callbackstatus = isCallBack(channel.getUrl(), inv.getMethodName(), paraIndex);
//如果是callback,则创建proxy到客户端,方法的执行可通过channel调用到client端的callback接口
//decode时需要根据channel及env获取url
URL url = null ;
try {
url = DubboProtocol.getDubboProtocol().getInvoker(channel, inv).getUrl();
} catch (RemotingException e) {
throw new IOException(StringUtils.toString("get invoker error", e));
}
byte callbackstatus = isCallBack(url, inv.getMethodName(), paraIndex);
switch (callbackstatus) {
case CallbackServiceCodec.CALLBACK_NONE:
return inObject;
case CallbackServiceCodec.CALLBACK_CREATE:
try{
return referOrdestroyCallbackService(channel, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true);
return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), true);
}catch (Exception e) {
logger.error(e);
throw new IOException(StringUtils.toString(e));
}
case CallbackServiceCodec.CALLBACK_DESTROY:
try{
return referOrdestroyCallbackService(channel, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false);
return referOrdestroyCallbackService(channel, url, pts[paraIndex], inv, Integer.parseInt(inv.getAttachment(INV_ATT_CALLBACK_KEY + paraIndex)), false);
}catch (Exception e) {
throw new IOException(StringUtils.toString(e));
}
default:
return inObject ;
}
}
}
}
\ No newline at end of file
......@@ -39,14 +39,14 @@ import com.alibaba.dubbo.rpc.protocol.AbstractInvoker;
*
* @author chao.liuc
*/
public class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
class ChannelWrappedInvoker<T> extends AbstractInvoker<T> {
private final Channel channel;
private final String serviceKey ;
public ChannelWrappedInvoker(Class<T> serviceType, Channel channel, String serviceKey) {
public ChannelWrappedInvoker(Class<T> serviceType, Channel channel, URL url, String serviceKey) {
super(serviceType, channel.getUrl(), new String[] { Constants.GROUP_KEY,
super(serviceType, url, new String[] { Constants.GROUP_KEY,
Constants.TOKEN_KEY, Constants.TIMEOUT_KEY });
this.channel = channel;
this.serviceKey = serviceKey;
......
......@@ -77,43 +77,19 @@ public class DubboProtocol extends AbstractProtocol {
//consumer side export a stub service for dispatching event
//servicekey-stubmethods
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
private final ConcurrentMap<String, String> stubServiceMethodsMap = new ConcurrentHashMap<String, String>();
private static final String IS_CALLBACK_SERVICE_INVOKE = "_isCallBackServiceInvoke";
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
private boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(channel.getUrl().getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
if (message instanceof Invocation) {
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
Invocation inv = (Invocation) message;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
//如果是客户端的回调服务.
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(RpcConstants.STUB_EVENT_KEY));
if (isStubServiceInvoke){
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if(isCallBackServiceInvoke){
path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(RpcConstants.CALLBACK_SERVICE_KEY);
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:"+message);
if (isCallBackServiceInvoke){
String methodsStr = exporter.getInvoker().getUrl().getParameters().get("methods");
Invoker<?> invoker = getInvoker(channel, inv);
//如果是callback 需要处理高版本调用低版本的问题
if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))){
String methodsStr = invoker.getUrl().getParameters().get("methods");
boolean hasMethod = false;
if (methodsStr == null || methodsStr.indexOf(",") == -1){
hasMethod = inv.getMethodName().equals(methodsStr);
......@@ -127,12 +103,12 @@ public class DubboProtocol extends AbstractProtocol {
}
}
if (!hasMethod){
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + exporter.getInvoker().getUrl()) +" ,invocation is :"+inv );
logger.warn(new IllegalStateException("The methodName "+inv.getMethodName()+" not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) +" ,invocation is :"+inv );
return null;
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
return exporter.getInvoker().invoke(inv);
return invoker.invoke(inv);
}
throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
}
......@@ -206,8 +182,46 @@ public class DubboProtocol extends AbstractProtocol {
public Collection<Exporter<?>> getExporters() {
return Collections.unmodifiableCollection(exporterMap.values());
}
}
Map<String, Exporter<?>> getExporterMap(){
return exporterMap;
}
private boolean isClientSide(Channel channel) {
InetSocketAddress address = channel.getRemoteAddress();
URL url = channel.getUrl();
return url.getPort() == address.getPort() &&
NetUtils.filterLocalHost(channel.getUrl().getIp())
.equals(NetUtils.filterLocalHost(address.getAddress().getHostAddress()));
}
Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException{
boolean isCallBackServiceInvoke = false;
boolean isStubServiceInvoke = false;
int port = channel.getLocalAddress().getPort();
String path = inv.getAttachments().get(Constants.PATH_KEY);
//如果是客户端的回调服务.
isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getAttachments().get(RpcConstants.STUB_EVENT_KEY));
if (isStubServiceInvoke){
port = channel.getRemoteAddress().getPort();
}
//callback
isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
if(isCallBackServiceInvoke){
path = inv.getAttachments().get(Constants.PATH_KEY)+"."+inv.getAttachments().get(RpcConstants.CALLBACK_SERVICE_KEY);
inv.getAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
}
String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
if (exporter == null)
throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);
return exporter.getInvoker();
}
public Collection<Invoker<?>> getInvokers() {
return Collections.unmodifiableCollection(invokers);
}
......@@ -222,7 +236,7 @@ public class DubboProtocol extends AbstractProtocol {
String key = url.getAddress();
//client 也可以暴露一个只有server可以调用的服务。
boolean isServer = url.getParameter(RpcConstants.IS_SERVER_KEY,true);
if (isServer && ! serverMap.containsKey(key)) {
if (isServer && ! serverMap.containsKey(key)) {
serverMap.put(key, getServer(url));
}
// export service.
......
......@@ -38,7 +38,8 @@ import com.alibaba.dubbo.rpc.protocol.dubbo.support.ProtocolUtils;
public class ExplicitCallbackTest {
protected Exporter<IDemoService> exporter = null;
protected Exporter<IDemoService> exporter = null;
protected Exporter<IHelloService> hello_exporter = null;
protected Invoker<IDemoService> reference = null;
@After
......@@ -46,9 +47,12 @@ public class ExplicitCallbackTest {
destroyService();
}
public void exportService(){
exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL);
}
public void exportService(){
//先export一个service,测试共享连接的问题
serviceURL=serviceURL.addParameter("connections", 1);
// hello_exporter = ProtocolUtils.export(new HelloServiceImpl(), IHelloService.class, "dubbo://127.0.0.1:"+serviceURL.getPort()+"/"+IHelloService.class);
exporter = ProtocolUtils.export(new DemoServiceImpl(), IDemoService.class, serviceURL);
}
void referService() {
demoProxy = (IDemoService)ProtocolUtils.refer(IDemoService.class, consumerUrl);
}
......@@ -65,7 +69,8 @@ public class ExplicitCallbackTest {
+"&xxx.0.callback=true"
+"&xxx2.0.callback=true"
+"&unxxx2.0.callback=false"
+"&timeout="+timeout
+"&timeout="+timeout
+"&retries=0"
+"&"+RpcConstants.CALLBACK_INSTANCES_LIMIT_KEY+"="+callbacks
);
// uncomment is unblock invoking
......@@ -86,7 +91,8 @@ public class ExplicitCallbackTest {
public void destroyService(){
demoProxy = null ;
try {
if (exporter!=null) exporter.unexport();
if (exporter!=null) exporter.unexport();
if (hello_exporter!=null) hello_exporter.unexport();
if (reference!=null) reference.destroy();
}catch (Exception e) {
}
......@@ -94,15 +100,25 @@ public class ExplicitCallbackTest {
// ============================华丽的分割线================================================
interface IDemoCallback{
String yyy(String msg);
}
}
interface IHelloService{
public String sayHello();
}
interface IDemoService{
public String get();
public int getCallbackCount();
public void xxx(IDemoCallback callback,String arg1,int runs,int sleep);
public void xxx2(IDemoCallback callback);
public void unxxx2(IDemoCallback callback);
}
class HelloServiceImpl implements IHelloService{
public String sayHello() {
return "hello";
}
}
class DemoServiceImpl implements IDemoService {
public String get(){
return "ok" ;
......@@ -183,8 +199,9 @@ public class ExplicitCallbackTest {
// ============================华丽的分割线================================================
IDemoService demoProxy = null;
@Test
public void TestCallbackNormal() throws Exception {
initOrResetUrl(1, 1000); initOrResetService() ;
public void TestCallbackNormal() throws Exception {
initOrResetUrl(1, 10000000); initOrResetService() ;
final AtomicInteger count = new AtomicInteger(0);
demoProxy.xxx(new IDemoCallback() {
......@@ -194,9 +211,12 @@ public class ExplicitCallbackTest {
return "ok";
}
},"other custom args" , 10 , 100);
System.out.println("Async...");
System.out.println("Async...");
// Thread.sleep(10000000);
assertCallbackCount(10,100,count);
destroyService();
destroyService();
}
@Test
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册