提交 5959b8ee 编写于 作者: G guide

[refractor]主要重构了网络传输部分的代码

上级 801287f4
......@@ -181,10 +181,10 @@ public class HelloController {
```
```java
ClientTransport clientTransport = new SocketRpcClient();
ClientTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(clientTransport, rpcServiceProperties);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
......@@ -226,10 +226,10 @@ public class HelloController {
```
```java
ClientTransport clientTransport = new SocketRpcClient();
ClientTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(clientTransport, rpcServiceProperties);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
......@@ -33,7 +33,7 @@
现在,在你只需要通过下面的方式就能成功接收到客户端返回的结果:
```java
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) clientTransport.sendRpcRequest(rpcRequest);
CompletableFuture<RpcResponse> completableFuture = (CompletableFuture<RpcResponse>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
```
......@@ -2,19 +2,19 @@ package github.javaguide;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
/**
* @author shuang.kou
* @createTime 2020年05月10日 07:25:00
*/
public class RpcFrameworkSimpleClientMain {
public class SocketClientMain {
public static void main(String[] args) {
ClientTransport clientTransport = new SocketRpcClient();
RpcRequestTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(clientTransport, rpcServiceProperties);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
import github.javaguide.HelloService;
import github.javaguide.annotation.RpcScan;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.remoting.transport.netty.server.NettyServer;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import github.javaguide.serviceimpl.HelloServiceImpl2;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
......@@ -16,12 +16,12 @@ public class NettyServerMain {
public static void main(String[] args) {
// Register service via annotation
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(NettyServerMain.class);
NettyServer nettyServer = (NettyServer) applicationContext.getBean("nettyServer");
NettyRpcServer nettyRpcServer = (NettyRpcServer) applicationContext.getBean("nettyRpcServer");
// Register service manually
HelloService helloService2 = new HelloServiceImpl2();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
nettyServer.registerService(helloService2, rpcServiceProperties);
nettyServer.start();
nettyRpcServer.registerService(helloService2, rpcServiceProperties);
nettyRpcServer.start();
}
}
......@@ -7,7 +7,7 @@ import github.javaguide.serviceimpl.HelloServiceImpl;
* @author shuang.kou
* @createTime 2020年05月10日 07:25:00
*/
public class RpcFrameworkSimpleServerMain {
public class SocketServerMain {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
SocketRpcServer socketRpcServer = new SocketRpcServer();
......
......@@ -5,7 +5,7 @@ import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.transport.netty.server.NettyServer;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import lombok.extern.slf4j.Slf4j;
import java.net.InetAddress;
......@@ -70,7 +70,7 @@ public class ServiceProviderImpl implements ServiceProvider {
String serviceName = serviceRelatedInterface.getCanonicalName();
rpcServiceProperties.setServiceName(serviceName);
this.addService(service, serviceRelatedInterface, rpcServiceProperties);
serviceRegistry.registerService(rpcServiceProperties.toRpcServiceName(), new InetSocketAddress(host, NettyServer.PORT));
serviceRegistry.registerService(rpcServiceProperties.toRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
} catch (UnknownHostException e) {
log.error("occur exception when getHostAddress", e);
}
......
......@@ -6,8 +6,8 @@ import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.netty.client.NettyClientTransport;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.netty.client.NettyRpcClient;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
......@@ -34,11 +34,11 @@ public class RpcClientProxy implements InvocationHandler {
/**
* Used to send requests to the server.And there are two implementations: socket and netty
*/
private final ClientTransport clientTransport;
private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceProperties rpcServiceProperties;
public RpcClientProxy(ClientTransport clientTransport, RpcServiceProperties rpcServiceProperties) {
this.clientTransport = clientTransport;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceProperties rpcServiceProperties) {
this.rpcRequestTransport = rpcRequestTransport;
if (rpcServiceProperties.getGroup() == null) {
rpcServiceProperties.setGroup("");
}
......@@ -49,8 +49,8 @@ public class RpcClientProxy implements InvocationHandler {
}
public RpcClientProxy(ClientTransport clientTransport) {
this.clientTransport = clientTransport;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceProperties = RpcServiceProperties.builder().group("").version("").build();
}
......@@ -80,12 +80,12 @@ public class RpcClientProxy implements InvocationHandler {
.version(rpcServiceProperties.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
if (clientTransport instanceof NettyClientTransport) {
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) clientTransport.sendRpcRequest(rpcRequest);
if (rpcRequestTransport instanceof NettyRpcClient) {
CompletableFuture<RpcResponse<Object>> completableFuture = (CompletableFuture<RpcResponse<Object>>) rpcRequestTransport.sendRpcRequest(rpcRequest);
rpcResponse = completableFuture.get();
}
if (clientTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) clientTransport.sendRpcRequest(rpcRequest);
if (rpcRequestTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) rpcRequestTransport.sendRpcRequest(rpcRequest);
}
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
......
......@@ -33,7 +33,7 @@ public final class CuratorUtils {
private static final Map<String, List<String>> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>();
private static final Set<String> REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet();
private static CuratorFramework zkClient;
private static String defaultZookeeperAddress = "127.0.0.1:2181";
private static final String DEFAULT_ZOOKEEPER_ADDRESS = "127.0.0.1:2181";
private CuratorUtils() {
}
......@@ -97,9 +97,7 @@ public final class CuratorUtils {
public static CuratorFramework getZkClient() {
// check if user has set zk address
Properties properties = PropertiesFileUtil.readPropertiesFile(RpcConfigEnum.RPC_CONFIG_PATH.getPropertyValue());
if (properties != null) {
defaultZookeeperAddress = properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue());
}
String zookeeperAddress = properties != null && properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) != null ? properties.getProperty(RpcConfigEnum.ZK_ADDRESS.getPropertyValue()) : DEFAULT_ZOOKEEPER_ADDRESS;
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
return zkClient;
......@@ -108,7 +106,7 @@ public final class CuratorUtils {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
zkClient = CuratorFrameworkFactory.builder()
// the server to connect to (can be a server list)
.connectString(defaultZookeeperAddress)
.connectString(zookeeperAddress)
.retryPolicy(retryPolicy)
.build();
zkClient.start();
......
......@@ -10,7 +10,7 @@ import github.javaguide.remoting.dto.RpcRequest;
* @createTime 2020年05月29日 13:26:00
*/
@SPI
public interface ClientTransport {
public interface RpcRequestTransport {
/**
* send rpc request to server and get result
*
......
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.factory.SingletonFactory;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
......@@ -18,11 +17,9 @@ import java.util.concurrent.ConcurrentHashMap;
public class ChannelProvider {
private final Map<String, Channel> channelMap;
private final NettyClient nettyClient;
public ChannelProvider() {
channelMap = new ConcurrentHashMap<>();
nettyClient = SingletonFactory.getInstance(NettyClient.class);
}
public Channel get(InetSocketAddress inetSocketAddress) {
......@@ -37,10 +34,12 @@ public class ChannelProvider {
channelMap.remove(key);
}
}
// otherwise, reconnect to get the Channel
Channel channel = nettyClient.doConnect(inetSocketAddress);
return null;
}
public void set(InetSocketAddress inetSocketAddress, Channel channel) {
String key = inetSocketAddress.toString();
channelMap.put(key, channel);
return channel;
}
public void remove(InetSocketAddress inetSocketAddress) {
......
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
/**
* transport rpcRequest based on netty.
*
* @author shuang.kou
* @createTime 2020年05月29日 11:34:00
*/
@Slf4j
public class NettyClientTransport implements ClientTransport {
private final ServiceDiscovery serviceDiscovery;
private final UnprocessedRequests unprocessedRequests;
private final ChannelProvider channelProvider;
public NettyClientTransport() {
this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}
@Override
public CompletableFuture<RpcResponse<Object>> sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
// get server address related channel
Channel channel = channelProvider.get(inetSocketAddress);
if (channel != null && channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;
}
}
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder;
import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder;
import io.netty.bootstrap.Bootstrap;
......@@ -30,12 +40,15 @@ import java.util.concurrent.TimeUnit;
* @createTime 2020年05月29日 17:51:00
*/
@Slf4j
public final class NettyClient {
public final class NettyRpcClient implements RpcRequestTransport {
private final ServiceDiscovery serviceDiscovery;
private final UnprocessedRequests unprocessedRequests;
private final ChannelProvider channelProvider;
private final Bootstrap bootstrap;
private final EventLoopGroup eventLoopGroup;
// initialize resources such as EventLoopGroup, Bootstrap
public NettyClient() {
public NettyRpcClient() {
// initialize resources such as EventLoopGroup, Bootstrap
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
......@@ -52,12 +65,14 @@ public final class NettyClient {
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(new NettyClientHandler());
p.addLast(new NettyRpcClientHandler());
}
});
this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}
/**
* connect server and get the channel ,so that you can send rpc message to server
*
......@@ -78,8 +93,50 @@ public final class NettyClient {
return completableFuture.get();
}
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
// get server address related channel
Channel channel = getChannel(inetSocketAddress);
if (channel.isActive()) {
// put unprocessed request
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
} else {
future.channel().close();
resultFuture.completeExceptionally(future.cause());
log.error("Send failed:", future.cause());
}
});
} else {
throw new IllegalStateException();
}
return resultFuture;
}
public Channel getChannel(InetSocketAddress inetSocketAddress) {
Channel channel = channelProvider.get(inetSocketAddress);
if (channel == null) {
channel = doConnect(inetSocketAddress);
channelProvider.set(inetSocketAddress, channel);
}
return channel;
}
public void close() {
eventLoopGroup.shutdownGracefully();
}
}
......@@ -30,13 +30,13 @@ import java.net.InetSocketAddress;
* @createTime 2020年05月25日 20:50:00
*/
@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public class NettyRpcClientHandler extends ChannelInboundHandlerAdapter {
private final UnprocessedRequests unprocessedRequests;
private final ChannelProvider channelProvider;
private final NettyRpcClient nettyRpcClient;
public NettyClientHandler() {
public NettyRpcClientHandler() {
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
this.nettyRpcClient = SingletonFactory.getInstance(NettyRpcClient.class);
}
/**
......@@ -67,7 +67,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress());
Channel channel = nettyRpcClient.getChannel((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
......
......@@ -38,7 +38,7 @@ import java.util.concurrent.TimeUnit;
*/
@Slf4j
@Component
public class NettyServer {
public class NettyRpcServer {
public static final int PORT = 9998;
......@@ -78,7 +78,7 @@ public class NettyServer {
p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(serviceHandlerGroup, new NettyServerHandler());
p.addLast(serviceHandlerGroup, new NettyRpcServerHandler());
}
});
......
......@@ -28,11 +28,11 @@ import lombok.extern.slf4j.Slf4j;
* @createTime 2020年05月25日 20:44:00
*/
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
public class NettyRpcServerHandler extends ChannelInboundHandlerAdapter {
private final RpcRequestHandler rpcRequestHandler;
public NettyServerHandler() {
public NettyRpcServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
}
......
......@@ -5,7 +5,7 @@ import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
......@@ -23,7 +23,7 @@ import java.net.Socket;
*/
@AllArgsConstructor
@Slf4j
public class SocketRpcClient implements ClientTransport {
public class SocketRpcClient implements RpcRequestTransport {
private final ServiceDiscovery serviceDiscovery;
public SocketRpcClient() {
......
......@@ -15,7 +15,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import static github.javaguide.remoting.transport.netty.server.NettyServer.PORT;
import static github.javaguide.remoting.transport.netty.server.NettyRpcServer.PORT;
/**
* @author shuang.kou
......@@ -34,7 +34,6 @@ public class SocketRpcServer {
serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
}
public void registerService(Object service) {
serviceProvider.publishService(service);
}
......
......@@ -8,7 +8,7 @@ import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
......@@ -28,11 +28,11 @@ import java.lang.reflect.Field;
public class SpringBeanPostProcessor implements BeanPostProcessor {
private final ServiceProvider serviceProvider;
private final ClientTransport rpcClient;
private final RpcRequestTransport rpcClient;
public SpringBeanPostProcessor() {
this.serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
this.rpcClient = ExtensionLoader.getExtensionLoader(ClientTransport.class).getExtension("nettyClientTransport");
this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");
}
@SneakyThrows
......
nettyClientTransport=github.javaguide.remoting.transport.netty.client.NettyClientTransport
socketRpcClient=github.javaguide.remoting.transport.socket.SocketRpcClient
\ No newline at end of file
netty=github.javaguide.remoting.transport.netty.client.NettyRpcClient
socket=github.javaguide.remoting.transport.socket.SocketRpcClient
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册