提交 2fae220b 编写于 作者: S shuang.kou

[v3.0]use zookeeper save service address and reorganize the package structure

上级 d9a24bf0
# guide-rpc-framework # guide-rpc-framework
## 项目模块介绍
![](./images/RPC框架各个模块介绍.png)
package github.javaguide; package github.javaguide;
import github.javaguide.transport.RpcClientProxy;
import github.javaguide.transport.ClientTransport; import github.javaguide.transport.ClientTransport;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.transport.netty.client.NettyClientClientTransport; import github.javaguide.transport.netty.client.NettyClientClientTransport;
import java.net.InetSocketAddress;
/** /**
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 07:25:00 * @createTime 2020年05月10日 07:25:00
*/ */
public class NettyClientMain { public class NettyClientMain {
public static void main(String[] args) { public static void main(String[] args) {
ClientTransport rpcClient = new NettyClientClientTransport(new InetSocketAddress("127.0.0.1", 9999)); ClientTransport rpcClient = new NettyClientClientTransport();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient); RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class); HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222")); String hello = helloService.hello(new Hello("111", "222"));
System.out.println("上面的调用卡住之后,这里也不会调用了");
helloService.hello(new Hello("111", "222"));
//如需使用 assert 断言,需要在 VM options 添加参数:-ea //如需使用 assert 断言,需要在 VM options 添加参数:-ea
assert "Hello description is 222".equals(hello); assert "Hello description is 222".equals(hello);
String hello2 = helloService.hello(new Hello("111", "222"));
} }
} }
package github.javaguide; package github.javaguide;
import github.javaguide.transport.ClientTransport; import github.javaguide.transport.ClientTransport;
import github.javaguide.transport.RpcClientProxy; import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.transport.socket.SocketRpcClient; import github.javaguide.transport.socket.SocketRpcClient;
/** /**
...@@ -10,7 +10,7 @@ import github.javaguide.transport.socket.SocketRpcClient; ...@@ -10,7 +10,7 @@ import github.javaguide.transport.socket.SocketRpcClient;
*/ */
public class RpcFrameworkSimpleClientMain { public class RpcFrameworkSimpleClientMain {
public static void main(String[] args) { public static void main(String[] args) {
ClientTransport clientTransport = new SocketRpcClient("127.0.0.1", 9999); ClientTransport clientTransport = new SocketRpcClient();
RpcClientProxy rpcClientProxy = new RpcClientProxy(clientTransport); RpcClientProxy rpcClientProxy = new RpcClientProxy(clientTransport);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class); HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222")); String hello = helloService.hello(new Hello("111", "222"));
......
package github.javaguide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author shuang.kou
* @createTime 2020年05月12日 17:36:00
*/
public class HelloServiceImpl2 {
private static final Logger logger = LoggerFactory.getLogger(HelloServiceImpl2.class);
public String hello(Hello hello) {
logger.info("HelloServiceImpl2收到: {}.", hello.getMessage());
String result = "Hello description is " + hello.getDescription();
logger.info("HelloServiceImpl2返回: {}.", result);
return result;
}
}
package github.javaguide; package github.javaguide;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.transport.netty.server.NettyServer; import github.javaguide.transport.netty.server.NettyServer;
/** /**
...@@ -9,11 +8,8 @@ import github.javaguide.transport.netty.server.NettyServer; ...@@ -9,11 +8,8 @@ import github.javaguide.transport.netty.server.NettyServer;
*/ */
public class NettyServerMain { public class NettyServerMain {
public static void main(String[] args) { public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl(); HelloService helloService = new HelloServiceImpl();
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry(); NettyServer nettyServer = new NettyServer("127.0.0.1", 9999);
// 手动注册 nettyServer.publishService(helloService, HelloService.class);
defaultServiceRegistry.register(helloService);
NettyServer socketRpcServer = new NettyServer(9999);
socketRpcServer.run();
} }
} }
package github.javaguide; package github.javaguide;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.transport.socket.SocketRpcServer; import github.javaguide.transport.socket.SocketRpcServer;
/** /**
...@@ -9,11 +8,8 @@ import github.javaguide.transport.socket.SocketRpcServer; ...@@ -9,11 +8,8 @@ import github.javaguide.transport.socket.SocketRpcServer;
*/ */
public class RpcFrameworkSimpleServerMain { public class RpcFrameworkSimpleServerMain {
public static void main(String[] args) { public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl(); HelloService helloService = new HelloServiceImpl();
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry(); SocketRpcServer socketRpcServer = new SocketRpcServer("127.0.0.1", 8080);
// 手动注册 socketRpcServer.publishService(helloService,HelloService.class);
defaultServiceRegistry.register(helloService);
SocketRpcServer socketRpcServer = new SocketRpcServer();
socketRpcServer.start(9999);
} }
} }
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
<kryo.version>4.0.2</kryo.version> <kryo.version>4.0.2</kryo.version>
<!--tools--> <!--tools-->
<guava.version>29.0-jre</guava.version> <guava.version>29.0-jre</guava.version>
<!-- test -->
<junit.jupiter.version>5.5.2</junit.jupiter.version>
<junit.platform.version>1.5.2</junit.platform.version>
</properties> </properties>
<modules> <modules>
<module>rpc-framework-simple</module> <module>rpc-framework-simple</module>
...@@ -30,12 +33,20 @@ ...@@ -30,12 +33,20 @@
<module>rpc-framework-common</module> <module>rpc-framework-common</module>
</modules> </modules>
<dependencies> <dependencies>
<!-- lombok -->
<dependency> <dependency>
<groupId>org.projectlombok</groupId> <groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId> <artifactId>lombok</artifactId>
<version>1.18.8</version> <version>1.18.8</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<!-- guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- log -->
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId> <artifactId>slf4j-api</artifactId>
...@@ -46,6 +57,69 @@ ...@@ -46,6 +57,69 @@
<artifactId>slf4j-simple</artifactId> <artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version> <version>${slf4j.version}</version>
</dependency> </dependency>
<!-- test -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit.jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.platform</groupId>
<artifactId>junit-platform-runner</artifactId>
<version>${junit.platform.version}</version>
<scope>test</scope>
</dependency>
<!--zookeeper-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<exclusions>
<exclusion>
<artifactId>netty</artifactId>
<groupId>io.netty</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
......
...@@ -10,13 +10,4 @@ ...@@ -10,13 +10,4 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>rpc-framework-common</artifactId> <artifactId>rpc-framework-common</artifactId>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
</project> </project>
package github.javaguide.utils.zk;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author shuang.kou
* @createTime 2020年05月31日 11:38:00
*/
public class CuratorHelper {
private static final Logger logger = LoggerFactory.getLogger(CuratorHelper.class);
private static final int SLEEP_MS_BETWEEN_RETRIES = 100;
private static final int MAX_RETRIES = 3;
private static final String CONNECT_STRING = "127.0.0.1:2181";
private static final int CONNECTION_TIMEOUT_MS = 10 * 1000;
private static final int SESSION_TIMEOUT_MS = 60 * 1000;
public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc";
private static final Map<String, List<String>> serviceAddressMap = new ConcurrentHashMap<>();
public static CuratorFramework getZkClient() {
// 重试策略,重试3次,并在两次重试之间等待100毫秒,以防出现连接问题。
RetryPolicy retryPolicy = new RetryNTimes(
MAX_RETRIES, SLEEP_MS_BETWEEN_RETRIES);
return CuratorFrameworkFactory.builder()
//要连接的服务器(可以是服务器列表)
.connectString(CONNECT_STRING)
.retryPolicy(retryPolicy)
//连接超时时间,10秒
.connectionTimeoutMs(CONNECTION_TIMEOUT_MS)
//会话超时时间,60秒
.sessionTimeoutMs(SESSION_TIMEOUT_MS)
.build();
}
/**
* 创建临时节点
* 临时节点驻存在ZooKeeper中,当连接和session断掉时被删除。
*/
public static void createEphemeralNode(final CuratorFramework zkClient, final String path) {
try {
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path);
} catch (Exception e) {
logger.error("occur exception:", e);
}
}
/**
* 获取某个字节下的子节点
*/
public static List<String> getChildrenNodes(final CuratorFramework zkClient, final String serviceName) {
if (serviceAddressMap.containsKey(serviceName)) {
return serviceAddressMap.get(serviceName);
}
List<String> result = Collections.emptyList();
String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
try {
result = zkClient.getChildren().forPath(servicePath);
serviceAddressMap.put(serviceName, result);
registerWatcher(zkClient, serviceName);
} catch (Exception e) {
logger.error("occur exception:", e);
}
return result;
}
/**
* 注册监听
*
* @param serviceName 服务名称
*/
private static void registerWatcher(CuratorFramework zkClient, String serviceName) {
String servicePath = CuratorHelper.ZK_REGISTER_ROOT_PATH + "/" + serviceName;
PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true);
PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> {
List<String> serviceAddresses = curatorFramework.getChildren().forPath(servicePath);
serviceAddressMap.put(serviceName, serviceAddresses);
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
try {
pathChildrenCache.start();
} catch (Exception e) {
logger.error("occur exception:", e);
}
}
}
package github.javaguide.transport; package github.javaguide.handler;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.enumeration.RpcResponseCode; import github.javaguide.enumeration.RpcResponseCode;
import github.javaguide.registry.DefaultServiceRegistry; import github.javaguide.provider.ServiceProvider;
import github.javaguide.registry.ServiceRegistry; import github.javaguide.provider.ServiceProviderImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -17,10 +17,10 @@ import java.lang.reflect.Method; ...@@ -17,10 +17,10 @@ import java.lang.reflect.Method;
*/ */
public class RpcRequestHandler { public class RpcRequestHandler {
private static final Logger logger = LoggerFactory.getLogger(RpcRequestHandler.class); private static final Logger logger = LoggerFactory.getLogger(RpcRequestHandler.class);
private static final ServiceRegistry serviceRegistry; private static final ServiceProvider SERVICE_PROVIDER;
static { static {
serviceRegistry = new DefaultServiceRegistry(); SERVICE_PROVIDER = new ServiceProviderImpl();
} }
/** /**
...@@ -29,7 +29,7 @@ public class RpcRequestHandler { ...@@ -29,7 +29,7 @@ public class RpcRequestHandler {
public Object handle(RpcRequest rpcRequest) { public Object handle(RpcRequest rpcRequest) {
Object result = null; Object result = null;
//通过注册中心获取到目标类(客户端需要调用类) //通过注册中心获取到目标类(客户端需要调用类)
Object service = serviceRegistry.getService(rpcRequest.getInterfaceName()); Object service = SERVICE_PROVIDER.getServiceProvider(rpcRequest.getInterfaceName());
try { try {
result = invokeTargetMethod(rpcRequest, service); result = invokeTargetMethod(rpcRequest, service);
logger.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName()); logger.info("service:{} successful invoke method:{}", rpcRequest.getInterfaceName(), rpcRequest.getMethodName());
......
package github.javaguide.provider;
/**
* 保存和提供服务实例对象。服务端使用。
*
* @author shuang.kou
* @createTime 2020年05月31日 16:52:00
*/
public interface ServiceProvider {
/**
* 保存服务提供者
*/
<T> void addServiceProvider(T service);
/**
* 获取服务提供者
*/
Object getServiceProvider(String serviceName);
}
package github.javaguide.registry; package github.javaguide.provider;
import github.javaguide.enumeration.RpcErrorMessageEnum; import github.javaguide.enumeration.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException; import github.javaguide.exception.RpcException;
...@@ -10,16 +10,16 @@ import java.util.Set; ...@@ -10,16 +10,16 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* 默认的服务注册中心实现,通过 Map 保存服务信息,可以通过 zookeeper 来改进
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 11:23:00 * @createTime 2020年05月13日 11:23:00
*/ */
public class DefaultServiceRegistry implements ServiceRegistry { public class ServiceProviderImpl implements ServiceProvider {
private static final Logger logger = LoggerFactory.getLogger(DefaultServiceRegistry.class); private static final Logger logger = LoggerFactory.getLogger(ServiceProviderImpl.class);
/** /**
* 接口名和服务的对应关系,TODO 处理一个接口被两个实现类实现的情况 * 接口名和服务的对应关系
* note:处理一个接口被两个实现类实现的情况如何处理?
* key:service/interface name * key:service/interface name
* value:service * value:service
*/ */
...@@ -27,11 +27,11 @@ public class DefaultServiceRegistry implements ServiceRegistry { ...@@ -27,11 +27,11 @@ public class DefaultServiceRegistry implements ServiceRegistry {
private static final Set<String> registeredService = ConcurrentHashMap.newKeySet(); private static final Set<String> registeredService = ConcurrentHashMap.newKeySet();
/** /**
* TODO 修改为扫描注解注册 * note:可以修改为扫描注解注册
* 将这个对象所有实现的接口都注册进去 * 将这个对象所有实现的接口都注册进去
*/ */
@Override @Override
public synchronized <T> void register(T service) { public <T> void addServiceProvider(T service) {
String serviceName = service.getClass().getCanonicalName(); String serviceName = service.getClass().getCanonicalName();
if (registeredService.contains(serviceName)) { if (registeredService.contains(serviceName)) {
return; return;
...@@ -48,7 +48,7 @@ public class DefaultServiceRegistry implements ServiceRegistry { ...@@ -48,7 +48,7 @@ public class DefaultServiceRegistry implements ServiceRegistry {
} }
@Override @Override
public synchronized Object getService(String serviceName) { public Object getServiceProvider(String serviceName) {
Object service = serviceMap.get(serviceName); Object service = serviceMap.get(serviceName);
if (null == service) { if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
......
package github.javaguide.transport; package github.javaguide.proxy;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.transport.ClientTransport;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
package github.javaguide.registry; package github.javaguide.registry;
import java.net.InetSocketAddress;
/** /**
* 服务注册中心接口 * 服务注册中心接口
* *
...@@ -7,7 +9,19 @@ package github.javaguide.registry; ...@@ -7,7 +9,19 @@ package github.javaguide.registry;
* @createTime 2020年05月13日 08:39:00 * @createTime 2020年05月13日 08:39:00
*/ */
public interface ServiceRegistry { public interface ServiceRegistry {
<T> void register(T service); /**
* 注册服务
*
* @param serviceName 服务名称
* @param inetSocketAddress 提供服务的地址
*/
void registerService(String serviceName, InetSocketAddress inetSocketAddress);
Object getService(String serviceName); /**
* 查找服务
*
* @param serviceName 服务名称
* @return 提供服务的地址
*/
InetSocketAddress lookupService(String serviceName);
} }
package github.javaguide.registry;
import github.javaguide.utils.zk.CuratorHelper;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* 基于 zookeeper 实现服务注册中心
*
* @author shuang.kou
* @createTime 2020年05月31日 10:56:00
*/
public class ZkServiceRegistry implements ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(CuratorHelper.class);
private final CuratorFramework zkClient;
public ZkServiceRegistry() {
zkClient = CuratorHelper.getZkClient();
zkClient.start();
}
@Override
public void registerService(String serviceName, InetSocketAddress inetSocketAddress) {
//根节点下注册子节点:服务
StringBuilder servicePath = new StringBuilder(CuratorHelper.ZK_REGISTER_ROOT_PATH).append("/").append(serviceName);
//服务子节点下注册子节点:服务地址
servicePath.append(inetSocketAddress.toString());
CuratorHelper.createEphemeralNode(zkClient, servicePath.toString());
logger.info("节点创建成功,节点为:{}", servicePath);
}
@Override
public InetSocketAddress lookupService(String serviceName) {
String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0);
logger.info("成功找到服务地址:{}", serviceAddress);
return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1]));
}
}
...@@ -3,6 +3,8 @@ package github.javaguide.transport; ...@@ -3,6 +3,8 @@ package github.javaguide.transport;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
/** /**
* 传输 RpcRequest。
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月29日 13:26:00 * @createTime 2020年05月29日 13:26:00
*/ */
......
...@@ -29,6 +29,9 @@ public class NettyClient { ...@@ -29,6 +29,9 @@ public class NettyClient {
private static final Bootstrap b; private static final Bootstrap b;
private static final EventLoopGroup eventLoopGroup; private static final EventLoopGroup eventLoopGroup;
private NettyClient() {
}
// 初始化相关资源比如 EventLoopGroup、Bootstrap // 初始化相关资源比如 EventLoopGroup、Bootstrap
static { static {
eventLoopGroup = new NioEventLoopGroup(); eventLoopGroup = new NioEventLoopGroup();
...@@ -55,9 +58,6 @@ public class NettyClient { ...@@ -55,9 +58,6 @@ public class NettyClient {
}); });
} }
private NettyClient() {
}
public static void close() { public static void close() {
logger.info("call close method"); logger.info("call close method");
eventLoopGroup.shutdownGracefully(); eventLoopGroup.shutdownGracefully();
......
...@@ -2,6 +2,8 @@ package github.javaguide.transport.netty.client; ...@@ -2,6 +2,8 @@ package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.transport.ClientTransport; import github.javaguide.transport.ClientTransport;
import github.javaguide.utils.checker.RpcMessageChecker; import github.javaguide.utils.checker.RpcMessageChecker;
import io.netty.channel.Channel; import io.netty.channel.Channel;
...@@ -14,21 +16,24 @@ import java.net.InetSocketAddress; ...@@ -14,21 +16,24 @@ import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
/** /**
* 基于 Netty 传输 RpcRequest
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月29日 11:34:00 * @createTime 2020年05月29日 11:34:00
*/ */
public class NettyClientClientTransport implements ClientTransport { public class NettyClientClientTransport implements ClientTransport {
private static final Logger logger = LoggerFactory.getLogger(NettyClientClientTransport.class); private static final Logger logger = LoggerFactory.getLogger(NettyClientClientTransport.class);
private InetSocketAddress inetSocketAddress; private ServiceRegistry serviceRegistry;
public NettyClientClientTransport(InetSocketAddress inetSocketAddress) { public NettyClientClientTransport() {
this.inetSocketAddress = inetSocketAddress; this.serviceRegistry = new ZkServiceRegistry();
} }
@Override @Override
public Object sendRpcRequest(RpcRequest rpcRequest) { public Object sendRpcRequest(RpcRequest rpcRequest) {
AtomicReference<Object> result = new AtomicReference<>(null); AtomicReference<Object> result = new AtomicReference<>(null);
try { try {
InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress); Channel channel = ChannelProvider.get(inetSocketAddress);
if (channel.isActive()) { if (channel.isActive()) {
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> { channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
......
...@@ -2,6 +2,10 @@ package github.javaguide.transport.netty.server; ...@@ -2,6 +2,10 @@ package github.javaguide.transport.netty.server;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.serialize.kyro.KryoSerializer; import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.netty.codec.NettyKryoDecoder; import github.javaguide.transport.netty.codec.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.NettyKryoEncoder; import github.javaguide.transport.netty.codec.NettyKryoEncoder;
...@@ -18,6 +22,8 @@ import io.netty.handler.logging.LoggingHandler; ...@@ -18,6 +22,8 @@ import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/** /**
* 服务端。接收客户端消息,并且根据客户端的消息调用相应的方法,然后返回结果给客户端。 * 服务端。接收客户端消息,并且根据客户端的消息调用相应的方法,然后返回结果给客户端。
* *
...@@ -26,15 +32,27 @@ import org.slf4j.LoggerFactory; ...@@ -26,15 +32,27 @@ import org.slf4j.LoggerFactory;
*/ */
public class NettyServer { public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class); private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private final String host;
private final int port; private final int port;
private final KryoSerializer kryoSerializer; private final KryoSerializer kryoSerializer;
private final ServiceRegistry serviceRegistry;
private final ServiceProvider serviceProvider;
public NettyServer(int port) { public NettyServer(String host, int port) {
this.host = host;
this.port = port; this.port = port;
kryoSerializer = new KryoSerializer(); kryoSerializer = new KryoSerializer();
serviceRegistry = new ZkServiceRegistry();
serviceProvider = new ServiceProviderImpl();
}
public <T> void publishService(Object service, Class<T> serviceClass) {
serviceProvider.addServiceProvider(service);
serviceRegistry.registerService(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
start();
} }
public void run() { private void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup();
try { try {
...@@ -58,7 +76,7 @@ public class NettyServer { ...@@ -58,7 +76,7 @@ public class NettyServer {
.option(ChannelOption.SO_BACKLOG, 128); .option(ChannelOption.SO_BACKLOG, 128);
// 绑定端口,同步等待绑定成功 // 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(port).sync(); ChannelFuture f = b.bind(host, port).sync();
// 等待服务端监听端口关闭 // 等待服务端监听端口关闭
f.channel().closeFuture().sync(); f.channel().closeFuture().sync();
} catch (InterruptedException e) { } catch (InterruptedException e) {
......
...@@ -2,7 +2,7 @@ package github.javaguide.transport.netty.server; ...@@ -2,7 +2,7 @@ package github.javaguide.transport.netty.server;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.transport.RpcRequestHandler; import github.javaguide.handler.RpcRequestHandler;
import github.javaguide.utils.concurrent.ThreadPoolFactory; import github.javaguide.utils.concurrent.ThreadPoolFactory;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
......
...@@ -3,6 +3,8 @@ package github.javaguide.transport.socket; ...@@ -3,6 +3,8 @@ package github.javaguide.transport.socket;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.exception.RpcException; import github.javaguide.exception.RpcException;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.transport.ClientTransport; import github.javaguide.transport.ClientTransport;
import github.javaguide.utils.checker.RpcMessageChecker; import github.javaguide.utils.checker.RpcMessageChecker;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
...@@ -12,21 +14,29 @@ import org.slf4j.LoggerFactory; ...@@ -12,21 +14,29 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.io.ObjectInputStream; import java.io.ObjectInputStream;
import java.io.ObjectOutputStream; import java.io.ObjectOutputStream;
import java.net.InetSocketAddress;
import java.net.Socket; import java.net.Socket;
/** /**
* 基于 Socket 传输 RpcRequest
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 18:40:00 * @createTime 2020年05月10日 18:40:00
*/ */
@AllArgsConstructor @AllArgsConstructor
public class SocketRpcClient implements ClientTransport { public class SocketRpcClient implements ClientTransport {
private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class); private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class);
private String host; private final ServiceRegistry serviceRegistry;
private int port;
public SocketRpcClient() {
this.serviceRegistry = new ZkServiceRegistry();
}
@Override @Override
public Object sendRpcRequest(RpcRequest rpcRequest) { public Object sendRpcRequest(RpcRequest rpcRequest) {
try (Socket socket = new Socket(host, port)) { InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
// 通过输出流发送数据到服务端 // 通过输出流发送数据到服务端
objectOutputStream.writeObject(rpcRequest); objectOutputStream.writeObject(rpcRequest);
......
...@@ -2,7 +2,7 @@ package github.javaguide.transport.socket; ...@@ -2,7 +2,7 @@ package github.javaguide.transport.socket;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.transport.RpcRequestHandler; import github.javaguide.handler.RpcRequestHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
......
package github.javaguide.transport.socket; package github.javaguide.transport.socket;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.utils.concurrent.ThreadPoolFactory; import github.javaguide.utils.concurrent.ThreadPoolFactory;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
...@@ -17,14 +22,29 @@ public class SocketRpcServer { ...@@ -17,14 +22,29 @@ public class SocketRpcServer {
private final ExecutorService threadPool; private final ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(SocketRpcServer.class); private static final Logger logger = LoggerFactory.getLogger(SocketRpcServer.class);
private final String host;
private final int port;
private final ServiceRegistry serviceRegistry;
private final ServiceProvider serviceProvider;
public SocketRpcServer() {
public SocketRpcServer(String host, int port) {
this.host = host;
this.port = port;
threadPool = ThreadPoolFactory.createDefaultThreadPool("socket-server-rpc-pool"); threadPool = ThreadPoolFactory.createDefaultThreadPool("socket-server-rpc-pool");
serviceRegistry = new ZkServiceRegistry();
serviceProvider = new ServiceProviderImpl();
} }
public void start(int port) { public <T> void publishService(Object service, Class<T> serviceClass) {
serviceProvider.addServiceProvider(service);
serviceRegistry.registerService(serviceClass.getCanonicalName(), new InetSocketAddress(host, port));
start();
}
try (ServerSocket server = new ServerSocket(port)) { private void start() {
try (ServerSocket server = new ServerSocket()) {
server.bind(new InetSocketAddress(host, port));
logger.info("server starts..."); logger.info("server starts...");
Socket socket; Socket socket;
while ((socket = server.accept()) != null) { while ((socket = server.accept()) != null) {
......
package github.javaguide.registry;
import org.junit.jupiter.api.Test;
import java.net.InetSocketAddress;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* @author shuang.kou
* @createTime 2020年05月31日 16:25:00
*/
class ZkServiceRegistryTest {
@Test
void should_register_service_successful_and_lookup_service_by_service_name() {
ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry();
InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333);
zkServiceRegistry.registerService("github.javaguide.registry.ZkServiceRegistry", givenInetSocketAddress);
InetSocketAddress acquiredInetSocketAddress = zkServiceRegistry.lookupService("github.javaguide.registry.ZkServiceRegistry");
assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册