提交 e6746a46 编写于 作者: S shuang.kou

[v3.0]refractor optimize some code

上级 e2e26dbe
......@@ -25,12 +25,9 @@ public class RpcRequestHandler {
* 处理 rpcRequest :调用对应的方法,然后返回方法执行结果
*/
public Object handle(RpcRequest rpcRequest) {
Object result;
//通过注册中心获取到目标类(客户端需要调用类)
Object service = serviceProvider.getServiceProvider(rpcRequest.getInterfaceName());
result = invokeTargetMethod(rpcRequest, service);
log.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
return result;
return invokeTargetMethod(rpcRequest, service);
}
/**
......@@ -38,7 +35,7 @@ public class RpcRequestHandler {
*
* @param rpcRequest 客户端请求
* @param service 提供服务的对象
* @return
* @return 目标方法执行的结果
*/
private Object invokeTargetMethod(RpcRequest rpcRequest, Object service) {
Object result;
......@@ -48,6 +45,7 @@ public class RpcRequestHandler {
return RpcResponse.fail(RpcResponseCode.NOT_FOUND_METHOD);
}
result = method.invoke(service, rpcRequest.getParameters());
log.info("service:[{}] successful invoke method:[{}]", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
} catch (NoSuchMethodException | IllegalArgumentException | InvocationTargetException | IllegalAccessException e) {
throw new RpcException(e.getMessage(), e);
}
......
......@@ -9,6 +9,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* 实现了 ServiceProvider 接口,可以将其看做是一个保存和提供服务实例对象的示例
*
* @author shuang.kou
* @createTime 2020年05月13日 11:23:00
*/
......
......@@ -10,7 +10,8 @@ import java.lang.reflect.Proxy;
import java.util.UUID;
/**
* 动态代理类。当动态代理对象调用一个方法的时候,实际调用的是下面的 invoke 方法
* 动态代理类。当动态代理对象调用一个方法的时候,实际调用的是下面的 invoke 方法。
* 正是因为动态代理才让客户端调用的远程方法像是调用本地方法一样(屏蔽了中间过程)
*
* @author shuang.kou
* @createTime 2020年05月10日 19:01:00
......
......@@ -28,18 +28,13 @@ public class ChannelProvider {
*/
private static final int MAX_RETRY_COUNT = 5;
public static Channel get(InetSocketAddress inetSocketAddress) {
public static Channel get(InetSocketAddress inetSocketAddress) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
try {
connect(bootstrap, inetSocketAddress, countDownLatch);
countDownLatch.await();
} catch (InterruptedException e) {
log.error("occur exception when get channel:", e);
}
connect(bootstrap, inetSocketAddress, countDownLatch);
countDownLatch.await();
return channel;
}
private static void connect(Bootstrap bootstrap, InetSocketAddress inetSocketAddress, CountDownLatch countDownLatch) {
connect(bootstrap, inetSocketAddress, MAX_RETRY_COUNT, countDownLatch);
}
......
......@@ -3,8 +3,8 @@ package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.netty.codec.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.NettyKryoEncoder;
import github.javaguide.transport.netty.codec.kyro.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.kyro.NettyKryoEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
......
......@@ -34,29 +34,28 @@ public class NettyClientClientTransport implements ClientTransport {
try {
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress);
if (channel.isActive()) {
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: {}", rpcRequest);
} else {
future.channel().close();
log.error("Send failed:", future.cause());
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
RpcResponse rpcResponse = channel.attr(key).get();
log.info("client get rpcResponse from channel:{}", rpcResponse);
//校验 RpcResponse 和 RpcRequest
RpcMessageChecker.check(rpcResponse, rpcRequest);
result.set(rpcResponse.getData());
} else {
if (!channel.isActive()) {
NettyClient.close();
System.exit(0);
return null;
}
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: {}", rpcRequest);
} else {
future.channel().close();
log.error("Send failed:", future.cause());
}
});
channel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse" + rpcRequest.getRequestId());
RpcResponse rpcResponse = channel.attr(key).get();
log.info("client get rpcResponse from channel:{}", rpcResponse);
//校验 RpcResponse 和 RpcRequest
RpcMessageChecker.check(rpcResponse, rpcRequest);
result.set(rpcResponse.getData());
} catch (InterruptedException e) {
log.error("occur exception when send rpc message from client:", e);
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
return result.get();
......
package github.javaguide.transport.netty.codec;
package github.javaguide.transport.netty.codec.kyro;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
......
package github.javaguide.transport.netty.codec;
package github.javaguide.transport.netty.codec.kyro;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
......
......@@ -7,8 +7,8 @@ import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.netty.codec.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.NettyKryoEncoder;
import github.javaguide.transport.netty.codec.kyro.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.kyro.NettyKryoEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
......
......@@ -4,13 +4,14 @@ import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.handler.RpcRequestHandler;
import github.javaguide.utils.concurrent.ThreadPoolFactory;
import github.javaguide.utils.factory.SingletonFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.concurrent.ExecutorService;
......@@ -31,7 +32,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private final ExecutorService threadPool;
public NettyServerHandler() {
this.rpcRequestHandler = new RpcRequestHandler();
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
this.threadPool = ThreadPoolFactory.createDefaultThreadPool(THREAD_NAME_PREFIX);
}
......
......@@ -46,7 +46,6 @@ public class SocketRpcClient implements ClientTransport {
RpcMessageChecker.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
} catch (IOException | ClassNotFoundException e) {
log.error("occur exception when send sendRpcRequest");
throw new RpcException("调用服务失败:", e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册