提交 fe2a277e 编写于 作者: Y YunaiV

dubbo 协议,异步调用

上级 cac2e246
......@@ -29,6 +29,8 @@ import java.util.concurrent.TimeoutException;
/**
* FutureAdapter
*
* 适配 ResponseFuture 。通过这样的方式,对上层调用方,透明化 ResponseFuture 的存在。
*/
public class FutureAdapter<V> implements Future<V> {
......@@ -42,18 +44,22 @@ public class FutureAdapter<V> implements Future<V> {
return future;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return future.isDone();
}
@Override
@SuppressWarnings("unchecked")
public V get() throws InterruptedException, ExecutionException {
try {
......@@ -65,6 +71,7 @@ public class FutureAdapter<V> implements Future<V> {
}
}
@Override
@SuppressWarnings("unchecked")
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
int timeoutInMillis = (int) unit.convert(timeout, TimeUnit.MILLISECONDS);
......
......@@ -38,58 +38,92 @@ import java.util.concurrent.Future;
/**
* EventFilter
*
* 事件通知过滤器,可参见文档《Dubbo 用户指南 —— 事件通知》https://dubbo.gitbooks.io/dubbo-user-book/demos/events-notify.html
*/
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);
@Override
public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
// 获得是否异步调用
final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
// 触发前置方法
fireInvokeCallback(invoker, invocation);
// need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future.
// 调用方法
Result result = invoker.invoke(invocation);
if (isAsync) {
// 触发回调方法
if (isAsync) { // 异步回调
asyncCallback(invoker, invocation);
} else {
} else { // 同步回调
syncCallback(invoker, invocation, result);
}
return result;
}
/**
* 同步回调
*
* @param invoker Invoker 对象
* @param invocation Invocation 对象
* @param result RPC 结果
*/
private void syncCallback(final Invoker<?> invoker, final Invocation invocation, final Result result) {
if (result.hasException()) {
if (result.hasException()) { // 异常,触发异常回调
fireThrowCallback(invoker, invocation, result.getException());
} else {
} else { // 正常,触发正常回调
fireReturnCallback(invoker, invocation, result.getValue());
}
}
/**
* 异步回调
*
* @param invoker Invoker 对象
* @param invocation Invocation 对象
*/
private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
// 获得 Future 对象
Future<?> f = RpcContext.getContext().getFuture();
if (f instanceof FutureAdapter) {
ResponseFuture future = ((FutureAdapter<?>) f).getFuture();
// 触发回调
future.setCallback(new ResponseCallback() {
/**
* 触发正常回调方法
*
* @param rpcResult RPC 结果
*/
public void done(Object rpcResult) {
if (rpcResult == null) {
logger.error(new IllegalStateException("invalid result value : null, expected " + Result.class.getName()));
return;
}
///must be rpcResult
// must be rpcResult
if (!(rpcResult instanceof Result)) {
logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected " + Result.class.getName()));
return;
}
Result result = (Result) rpcResult;
if (result.hasException()) {
if (result.hasException()) { // 触发正常回调方法
fireThrowCallback(invoker, invocation, result.getException());
} else {
} else { // 触发异常回调方法
fireReturnCallback(invoker, invocation, result.getValue());
}
}
/**
* 触发异常回调方法
*
* @param exception 异常
*/
public void caught(Throwable exception) {
fireThrowCallback(invoker, invocation, exception);
}
......@@ -97,20 +131,27 @@ public class FutureFilter implements Filter {
}
}
/**
* 触发前置方法
*
* @param invoker Invoker 对象
* @param invocation Invocation 对象
*/
private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
// 获得前置方法和对象
final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY));
final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY));
if (onInvokeMethod == null && onInvokeInst == null) {
return;
}
if (onInvokeMethod == null || onInvokeInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onInvokeMethod != null && !onInvokeMethod.isAccessible()) {
if (!onInvokeMethod.isAccessible()) {
onInvokeMethod.setAccessible(true);
}
// 调用前置方法
Object[] params = invocation.getArguments();
try {
onInvokeMethod.invoke(onInvokeInst, params);
......@@ -121,22 +162,30 @@ public class FutureFilter implements Filter {
}
}
/**
* 触发正常回调方法
*
* @param invoker Invoker 对象
* @param invocation Invocation 对象
* @param result RPC 结果
*/
@SuppressWarnings("Duplicates")
private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
// 获得 `onreturn` 方法和对象
final Method onReturnMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
//not set onreturn callback
if (onReturnMethod == null && onReturnInst == null) {
return;
}
if (onReturnMethod == null || onReturnInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onReturnMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onReturnMethod != null && !onReturnMethod.isAccessible()) {
if (!onReturnMethod.isAccessible()) {
onReturnMethod.setAccessible(true);
}
// 参数数组
Object[] args = invocation.getArguments();
Object[] params;
Class<?>[] rParaTypes = onReturnMethod.getParameterTypes();
......@@ -153,6 +202,8 @@ public class FutureFilter implements Filter {
} else {
params = new Object[]{result};
}
// 调用方法
try {
onReturnMethod.invoke(onReturnInst, params);
} catch (InvocationTargetException e) {
......@@ -162,26 +213,35 @@ public class FutureFilter implements Filter {
}
}
/**
* 触发异常回调方法
*
* @param invoker Invoker 对象
* @param invocation Invocation 方法
* @param exception 异常
*/
@SuppressWarnings("Duplicates")
private void fireThrowCallback(final Invoker<?> invoker, final Invocation invocation, final Throwable exception) {
// 获得 `onthrow` 方法和对象
final Method onthrowMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_METHOD_KEY));
final Object onthrowInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_THROW_INSTANCE_KEY));
//onthrow callback not configured
// onthrow callback not configured
if (onthrowMethod == null && onthrowInst == null) {
return;
}
if (onthrowMethod == null || onthrowInst == null) {
throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onthrow callback config , but no such " + (onthrowMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
}
if (onthrowMethod != null && !onthrowMethod.isAccessible()) {
if (!onthrowMethod.isAccessible()) {
onthrowMethod.setAccessible(true);
}
Class<?>[] rParaTypes = onthrowMethod.getParameterTypes();
if (rParaTypes[0].isAssignableFrom(exception.getClass())) {
if (rParaTypes[0].isAssignableFrom(exception.getClass())) { // 符合异常
try {
// 参数数组
Object[] args = invocation.getArguments();
Object[] params;
if (rParaTypes.length > 1) {
if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)) {
params = new Object[2];
......@@ -195,11 +255,13 @@ public class FutureFilter implements Filter {
} else {
params = new Object[]{exception};
}
// 调用方法
onthrowMethod.invoke(onthrowInst, params);
} catch (Throwable e) {
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), e);
}
} else {
} else { // 不符合异常,打印错误日志
logger.error(invocation.getMethodName() + ".call back method invoke error . callback method :" + onthrowMethod + ", url:" + invoker.getUrl(), exception);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册