From 709f4c960ad00118d685c96c880fd05de50d66ed Mon Sep 17 00:00:00 2001 From: guide Date: Tue, 21 Jul 2020 16:58:48 +0800 Subject: [PATCH] [refractor]use BeanPostProcesser replace ApplicationContextAware and get zk address from properties file --- .../src/main/java/NettyServerMain2.java | 5 +- .../github/javaguide/HelloServiceImpl.java | 2 +- .../src/main/resources/rpc.properties | 3 +- ...rMessageEnum.java => RpcErrorMessage.java} | 2 +- ...ssageTypeEnum.java => RpcMessageType.java} | 2 +- .../javaguide/enumeration/RpcProperties.java | 19 +++++ .../javaguide/exception/RpcException.java | 10 +-- .../javaguide/factory/SingletonFactory.java | 6 +- .../threadpool/ThreadPoolFactoryUtils.java | 12 ++-- .../utils/file/PropertiesFileUtils.java | 30 ++++++++ .../javaguide/config/CustomShutdownHook.java | 4 +- .../javaguide/provider/ServiceProvider.java | 9 ++- .../provider/ServiceProviderImpl.java | 35 +++++++--- .../registry/{ => zk}/ZkServiceDiscovery.java | 9 ++- .../registry/{ => zk}/ZkServiceRegistry.java | 9 ++- .../registry/zk/util}/CuratorUtils.java | 70 +++++++++++-------- .../remoting/dto/RpcMessageChecker.java | 8 +-- .../javaguide/remoting/dto/RpcRequest.java | 4 +- .../remoting/handler/RpcRequestHandler.java | 7 +- .../netty/client/ChannelProvider.java | 27 +++---- .../transport/netty/client/NettyClient.java | 7 +- .../netty/client/NettyClientHandler.java | 21 +++--- .../netty/client/NettyClientTransport.java | 16 +++-- .../netty/client/UnprocessedRequests.java | 12 ++-- .../netty/codec/kyro/NettyKryoDecoder.java | 4 +- .../netty/codec/kyro/NettyKryoEncoder.java | 4 +- .../transport/netty/server/NettyServer.java | 39 ++--------- .../netty/server/NettyServerHandler.java | 4 +- .../netty/server/SpringBeanPostProcessor.java | 40 +++++++++++ .../transport/socket/SocketRpcClient.java | 2 +- .../SocketRpcRequestHandlerRunnable.java | 4 +- .../transport/socket/SocketRpcServer.java | 2 +- .../registry/ZkServiceRegistryTest.java | 6 +- 33 files changed, 276 insertions(+), 158 deletions(-) rename rpc-framework-common/src/main/java/github/javaguide/enumeration/{RpcErrorMessageEnum.java => RpcErrorMessage.java} (94%) rename rpc-framework-common/src/main/java/github/javaguide/enumeration/{RpcMessageTypeEnum.java => RpcMessageType.java} (79%) create mode 100644 rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcProperties.java create mode 100644 rpc-framework-common/src/main/java/github/javaguide/utils/file/PropertiesFileUtils.java rename rpc-framework-simple/src/main/java/github/javaguide/registry/{ => zk}/ZkServiceDiscovery.java (79%) rename rpc-framework-simple/src/main/java/github/javaguide/registry/{ => zk}/ZkServiceRegistry.java (61%) rename {rpc-framework-common/src/main/java/github/javaguide/utils/zk => rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util}/CuratorUtils.java (60%) create mode 100644 rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/SpringBeanPostProcessor.java diff --git a/example-server/src/main/java/NettyServerMain2.java b/example-server/src/main/java/NettyServerMain2.java index 165ec6d..79ea707 100644 --- a/example-server/src/main/java/NettyServerMain2.java +++ b/example-server/src/main/java/NettyServerMain2.java @@ -1,5 +1,7 @@ import github.javaguide.HelloService; import github.javaguide.HelloServiceImpl; +import github.javaguide.provider.ServiceProvider; +import github.javaguide.provider.ServiceProviderImpl; import github.javaguide.remoting.transport.netty.server.NettyServer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; @@ -13,6 +15,7 @@ public class NettyServerMain2 { AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(NettyServerMain.class); NettyServer nettyServer = applicationContext.getBean(NettyServer.class); nettyServer.start(); - nettyServer.publishService(helloService, HelloService.class); + ServiceProvider serviceProvider = new ServiceProviderImpl(); + serviceProvider.publishService(helloService); } } diff --git a/example-server/src/main/java/github/javaguide/HelloServiceImpl.java b/example-server/src/main/java/github/javaguide/HelloServiceImpl.java index 0d48a54..d31718c 100644 --- a/example-server/src/main/java/github/javaguide/HelloServiceImpl.java +++ b/example-server/src/main/java/github/javaguide/HelloServiceImpl.java @@ -12,7 +12,7 @@ import lombok.extern.slf4j.Slf4j; public class HelloServiceImpl implements HelloService { static { - System.out.println("sdasdasdasdasd"); + System.out.println("HelloServiceImpl被创建"); } @Override diff --git a/example-server/src/main/resources/rpc.properties b/example-server/src/main/resources/rpc.properties index 465deda..b88587f 100644 --- a/example-server/src/main/resources/rpc.properties +++ b/example-server/src/main/resources/rpc.properties @@ -1,2 +1 @@ -rpc.server.host=127.0.0.1 -rpc.server.port=9998 \ No newline at end of file +rpc.zookeeper.address=127.0.0.1:2181 diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessageEnum.java b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java similarity index 94% rename from rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessageEnum.java rename to rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java index 75d8b38..31d118b 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessageEnum.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessage.java @@ -11,7 +11,7 @@ import lombok.ToString; @AllArgsConstructor @Getter @ToString -public enum RpcErrorMessageEnum { +public enum RpcErrorMessage { CLIENT_CONNECT_SERVER_FAILURE("客户端连接服务端失败"), SERVICE_INVOCATION_FAILURE("服务调用失败"), SERVICE_CAN_NOT_BE_FOUND("没有找到指定的服务"), diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageTypeEnum.java b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java similarity index 79% rename from rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageTypeEnum.java rename to rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java index 143ff57..bea97b9 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageTypeEnum.java +++ b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcMessageType.java @@ -4,6 +4,6 @@ package github.javaguide.enumeration; * @author shuang.kou * @createTime 2020年06月16日 20:34:00 */ -public enum RpcMessageTypeEnum { +public enum RpcMessageType { HEART_BEAT } diff --git a/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcProperties.java b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcProperties.java new file mode 100644 index 0000000..d2cc817 --- /dev/null +++ b/rpc-framework-common/src/main/java/github/javaguide/enumeration/RpcProperties.java @@ -0,0 +1,19 @@ +package github.javaguide.enumeration; + +public enum RpcProperties { + + RPC_CONFIG_PATH("rpc.properties"), + ZK_ADDRESS("rpc.zookeeper.address"); + + private final String propertyValue; + + + RpcProperties(String propertyValue) { + this.propertyValue = propertyValue; + } + + public String getPropertyValue() { + return propertyValue; + } + +} diff --git a/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java b/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java index 65b4411..cd2bd67 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java +++ b/rpc-framework-common/src/main/java/github/javaguide/exception/RpcException.java @@ -1,21 +1,21 @@ package github.javaguide.exception; -import github.javaguide.enumeration.RpcErrorMessageEnum; +import github.javaguide.enumeration.RpcErrorMessage; /** * @author shuang.kou * @createTime 2020年05月12日 16:48:00 */ public class RpcException extends RuntimeException { - public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum, String detail) { - super(rpcErrorMessageEnum.getMessage() + ":" + detail); + public RpcException(RpcErrorMessage rpcErrorMessage, String detail) { + super(rpcErrorMessage.getMessage() + ":" + detail); } public RpcException(String message, Throwable cause) { super(message, cause); } - public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum) { - super(rpcErrorMessageEnum.getMessage()); + public RpcException(RpcErrorMessage rpcErrorMessage) { + super(rpcErrorMessage.getMessage()); } } diff --git a/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java index 664be03..5038388 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java +++ b/rpc-framework-common/src/main/java/github/javaguide/factory/SingletonFactory.java @@ -10,19 +10,19 @@ import java.util.Map; * @createTime 2020年06月03日 15:04:00 */ public final class SingletonFactory { - private static Map objectMap = new HashMap<>(); + private static final Map OBJECT_MAP = new HashMap<>(); private SingletonFactory() { } public static T getInstance(Class c) { String key = c.toString(); - Object instance = objectMap.get(key); + Object instance = OBJECT_MAP.get(key); synchronized (c) { if (instance == null) { try { instance = c.newInstance(); - objectMap.put(key, instance); + OBJECT_MAP.put(key, instance); } catch (IllegalAccessException | InstantiationException e) { throw new RuntimeException(e.getMessage(), e); } diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java index 1786f2c..810a138 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java @@ -9,7 +9,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -25,11 +24,10 @@ public final class ThreadPoolFactoryUtils { /** * 通过 threadNamePrefix 来区分不同线程池(我们可以把相同 threadNamePrefix 的线程池看作是为同一业务场景服务)。 - * TODO :通过信号量机制( {@link Semaphore} 满足条件)限制创建的线程池数量(线程池和线程不是越多越好) * key: threadNamePrefix * value: threadPool */ - private static Map threadPools = new ConcurrentHashMap<>(); + private static final Map THREAD_POOLS = new ConcurrentHashMap<>(); private ThreadPoolFactoryUtils() { @@ -45,12 +43,12 @@ public final class ThreadPoolFactoryUtils { } public static ExecutorService createCustomThreadPoolIfAbsent(CustomThreadPoolConfig customThreadPoolConfig, String threadNamePrefix, Boolean daemon) { - ExecutorService threadPool = threadPools.computeIfAbsent(threadNamePrefix, k -> createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon)); + ExecutorService threadPool = THREAD_POOLS.computeIfAbsent(threadNamePrefix, k -> createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon)); // 如果 threadPool 被 shutdown 的话就重新创建一个 if (threadPool.isShutdown() || threadPool.isTerminated()) { - threadPools.remove(threadNamePrefix); + THREAD_POOLS.remove(threadNamePrefix); threadPool = createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon); - threadPools.put(threadNamePrefix, threadPool); + THREAD_POOLS.put(threadNamePrefix, threadPool); } return threadPool; } @@ -60,7 +58,7 @@ public final class ThreadPoolFactoryUtils { */ public static void shutDownAllThreadPool() { log.info("call shutDownAllThreadPool method"); - threadPools.entrySet().parallelStream().forEach(entry -> { + THREAD_POOLS.entrySet().parallelStream().forEach(entry -> { ExecutorService executorService = entry.getValue(); executorService.shutdown(); log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated()); diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/file/PropertiesFileUtils.java b/rpc-framework-common/src/main/java/github/javaguide/utils/file/PropertiesFileUtils.java new file mode 100644 index 0000000..a062bdf --- /dev/null +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/file/PropertiesFileUtils.java @@ -0,0 +1,30 @@ +package github.javaguide.utils.file; + +import lombok.extern.slf4j.Slf4j; + +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Properties; + +/** + * @author shuang.kou + * @createTime 2020年07月21日 14:25:00 + **/ +@Slf4j +public final class PropertiesFileUtils { + private PropertiesFileUtils() { + } + + public static Properties readPropertiesFile(String fileName) { + String rootPath = Thread.currentThread().getContextClassLoader().getResource("").getPath(); + String rpcConfigPath = rootPath + fileName; + Properties properties = null; + try (FileInputStream fileInputStream = new FileInputStream(rpcConfigPath)) { + properties = new Properties(); + properties.load(fileInputStream); + } catch (IOException e) { + log.error("occur exception when read properties file [{}]", fileName); + } + return properties; + } +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/config/CustomShutdownHook.java b/rpc-framework-simple/src/main/java/github/javaguide/config/CustomShutdownHook.java index 9abe7a7..f943dc9 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/config/CustomShutdownHook.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/config/CustomShutdownHook.java @@ -1,7 +1,7 @@ package github.javaguide.config; +import github.javaguide.registry.zk.util.CuratorUtils; import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils; -import github.javaguide.utils.zk.CuratorUtils; import lombok.extern.slf4j.Slf4j; /** @@ -21,7 +21,7 @@ public class CustomShutdownHook { public void clearAll() { log.info("addShutdownHook for clearAll"); Runtime.getRuntime().addShutdownHook(new Thread(() -> { - CuratorUtils.clearRegistry(); + CuratorUtils.clearRegistry(CuratorUtils.getZkClient()); ThreadPoolFactoryUtils.shutDownAllThreadPool(); })); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProvider.java b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProvider.java index 9325bed..783b260 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProvider.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProvider.java @@ -14,7 +14,7 @@ public interface ServiceProvider { * @param service 服务实例对象 * @param serviceClass 服务实例对象实现的接口类 */ - void addServiceProvider(Object service, Class serviceClass); + void addServiceProvider(Object service, Class serviceClass); /** * 获取服务实例对象 @@ -23,4 +23,11 @@ public interface ServiceProvider { * @return 服务实例对象 */ Object getServiceProvider(String serviceName); + + /** + * 发布服务 + * + * @param service 服务实例对象 + */ + void publishService(Object service); } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java index ed2d80a..f788e74 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/provider/ServiceProviderImpl.java @@ -1,9 +1,15 @@ package github.javaguide.provider; -import github.javaguide.enumeration.RpcErrorMessageEnum; +import github.javaguide.enumeration.RpcErrorMessage; import github.javaguide.exception.RpcException; +import github.javaguide.registry.ServiceRegistry; +import github.javaguide.registry.zk.ZkServiceRegistry; +import github.javaguide.remoting.transport.netty.server.NettyServer; import lombok.extern.slf4j.Slf4j; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -23,29 +29,40 @@ public class ServiceProviderImpl implements ServiceProvider { * key:service/interface name * value:service */ - private static final Map serviceMap = new ConcurrentHashMap<>(); - private static final Set registeredService = ConcurrentHashMap.newKeySet(); - + private static final Map SERVICE_MAP = new ConcurrentHashMap<>(); + private static final Set REGISTERED_SERVICE = ConcurrentHashMap.newKeySet(); + private final ServiceRegistry serviceRegistry = new ZkServiceRegistry(); /** * note:可以修改为扫描注解注册 */ @Override public void addServiceProvider(Object service, Class serviceClass) { String serviceName = serviceClass.getCanonicalName(); - if (registeredService.contains(serviceName)) { + if (REGISTERED_SERVICE.contains(serviceName)) { return; } - registeredService.add(serviceName); - serviceMap.put(serviceName, service); + REGISTERED_SERVICE.add(serviceName); + SERVICE_MAP.put(serviceName, service); log.info("Add service: {} and interfaces:{}", serviceName, service.getClass().getInterfaces()); } @Override public Object getServiceProvider(String serviceName) { - Object service = serviceMap.get(serviceName); + Object service = SERVICE_MAP.get(serviceName); if (null == service) { - throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND); + throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND); } return service; } + + public void publishService(Object service) { + try { + String host = InetAddress.getLocalHost().getHostAddress(); + Class anInterface = service.getClass().getInterfaces()[0]; + this.addServiceProvider(service, anInterface); + serviceRegistry.registerService(anInterface.getCanonicalName(), new InetSocketAddress(host, NettyServer.PORT)); + } catch (UnknownHostException e) { + log.error("occur exception when getHostAddress", e); + } + } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java similarity index 79% rename from rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java rename to rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java index a7db820..4484bc7 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceDiscovery.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceDiscovery.java @@ -1,9 +1,11 @@ -package github.javaguide.registry; +package github.javaguide.registry.zk; import github.javaguide.loadbalance.LoadBalance; import github.javaguide.loadbalance.RandomLoadBalance; -import github.javaguide.utils.zk.CuratorUtils; +import github.javaguide.registry.ServiceDiscovery; +import github.javaguide.registry.zk.util.CuratorUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; import java.net.InetSocketAddress; import java.util.List; @@ -25,7 +27,8 @@ public class ZkServiceDiscovery implements ServiceDiscovery { @Override public InetSocketAddress lookupService(String serviceName) { // 这里直接去了第一个找到的服务地址,eg:127.0.0.1:9999 - List serviceUrlList = CuratorUtils.getChildrenNodes(serviceName); + CuratorFramework zkClient = CuratorUtils.getZkClient(); + List serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, serviceName); // 负载均衡 String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList); log.info("成功找到服务地址:[{}]", targetServiceUrl); diff --git a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceRegistry.java similarity index 61% rename from rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java rename to rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceRegistry.java index 7beeb6f..764a04f 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/registry/ZkServiceRegistry.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/ZkServiceRegistry.java @@ -1,7 +1,9 @@ -package github.javaguide.registry; +package github.javaguide.registry.zk; -import github.javaguide.utils.zk.CuratorUtils; +import github.javaguide.registry.ServiceRegistry; +import github.javaguide.registry.zk.util.CuratorUtils; import lombok.extern.slf4j.Slf4j; +import org.apache.curator.framework.CuratorFramework; import java.net.InetSocketAddress; @@ -18,6 +20,7 @@ public class ZkServiceRegistry implements ServiceRegistry { public void registerService(String serviceName, InetSocketAddress inetSocketAddress) { //根节点下注册子节点:服务 String servicePath = CuratorUtils.ZK_REGISTER_ROOT_PATH + "/" + serviceName + inetSocketAddress.toString(); - CuratorUtils.createPersistentNode(servicePath); + CuratorFramework zkClient = CuratorUtils.getZkClient(); + CuratorUtils.createPersistentNode(zkClient, servicePath); } } diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java similarity index 60% rename from rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java rename to rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java index 31c2d60..33794e9 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/zk/CuratorUtils.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/registry/zk/util/CuratorUtils.java @@ -1,10 +1,13 @@ -package github.javaguide.utils.zk; +package github.javaguide.registry.zk.util; +import github.javaguide.enumeration.RpcProperties; import github.javaguide.exception.RpcException; +import github.javaguide.utils.file.PropertiesFileUtils; import lombok.extern.slf4j.Slf4j; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -12,6 +15,7 @@ import org.apache.zookeeper.CreateMode; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -23,17 +27,14 @@ import java.util.concurrent.ConcurrentHashMap; */ @Slf4j public final class CuratorUtils { + private static final int BASE_SLEEP_TIME = 1000; private static final int MAX_RETRIES = 3; - private static final String CONNECT_STRING = "127.0.0.1:2181"; + private static String defaultZookeeperAddress = "127.0.0.1:2181"; public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc"; - private static final Map> serviceAddressMap = new ConcurrentHashMap<>(); - private static final Set registeredPathSet = ConcurrentHashMap.newKeySet(); - private static final CuratorFramework zkClient; - - static { - zkClient = getZkClient(); - } + private static final Map> SERVICE_ADDRESS_MAP = new ConcurrentHashMap<>(); + private static final Set REGISTERED_PATH_SET = ConcurrentHashMap.newKeySet(); + private static CuratorFramework zkClient; private CuratorUtils() { } @@ -43,16 +44,16 @@ public final class CuratorUtils { * * @param path 节点路径 */ - public static void createPersistentNode(String path) { + public static void createPersistentNode(CuratorFramework zkClient, String path) { try { - if (registeredPathSet.contains(path) || zkClient.checkExists().forPath(path) != null) { + if (REGISTERED_PATH_SET.contains(path) || zkClient.checkExists().forPath(path) != null) { log.info("节点已经存在,节点为:[{}]", path); } else { //eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999 zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); log.info("节点创建成功,节点为:[{}]", path); } - registeredPathSet.add(path); + REGISTERED_PATH_SET.add(path); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } @@ -64,16 +65,16 @@ public final class CuratorUtils { * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService * @return 指定字节下的所有子节点 */ - public static List getChildrenNodes(String serviceName) { - if (serviceAddressMap.containsKey(serviceName)) { - return serviceAddressMap.get(serviceName); + public static List getChildrenNodes(CuratorFramework zkClient, String serviceName) { + if (SERVICE_ADDRESS_MAP.containsKey(serviceName)) { + return SERVICE_ADDRESS_MAP.get(serviceName); } List result; String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; try { result = zkClient.getChildren().forPath(servicePath); - serviceAddressMap.put(serviceName, result); - registerWatcher(serviceName); + SERVICE_ADDRESS_MAP.put(serviceName, result); + registerWatcher(serviceName, zkClient); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } @@ -83,27 +84,36 @@ public final class CuratorUtils { /** * 清空注册中心的数据 */ - public static void clearRegistry() { - registeredPathSet.stream().parallel().forEach(p -> { + public static void clearRegistry(CuratorFramework zkClient) { + REGISTERED_PATH_SET.stream().parallel().forEach(p -> { try { zkClient.delete().forPath(p); } catch (Exception e) { throw new RpcException(e.getMessage(), e.getCause()); } }); - log.info("服务端(Provider)所有注册的服务都被清空:[{}]", registeredPathSet.toString()); + log.info("服务端(Provider)所有注册的服务都被清空:[{}]", REGISTERED_PATH_SET.toString()); } - private static CuratorFramework getZkClient() { - // 重试策略。重试3次,并且会增加重试之间的睡眠时间。 + public static CuratorFramework getZkClient() { + // check if user has set zk address + Properties properties = PropertiesFileUtils.readPropertiesFile(RpcProperties.RPC_CONFIG_PATH.getPropertyValue()); + if (properties != null) { + defaultZookeeperAddress = properties.getProperty(RpcProperties.ZK_ADDRESS.getPropertyValue()); + } + // if zkClient has been started, return directly + if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) { + return zkClient; + } + // Retry strategy. Retry 3 times, and will increase the sleep time between retries. RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES); - CuratorFramework curatorFramework = CuratorFrameworkFactory.builder() - //要连接的服务器(可以是服务器列表) - .connectString(CONNECT_STRING) + zkClient = CuratorFrameworkFactory.builder() + // the server to connect to (can be a server list) + .connectString(defaultZookeeperAddress) .retryPolicy(retryPolicy) .build(); - curatorFramework.start(); - return curatorFramework; + zkClient.start(); + return zkClient; } /** @@ -111,12 +121,12 @@ public final class CuratorUtils { * * @param serviceName 服务对象接口名 eg:github.javaguide.HelloService */ - private static void registerWatcher(String serviceName) { + private static void registerWatcher(String serviceName, CuratorFramework zkClient) { String servicePath = ZK_REGISTER_ROOT_PATH + "/" + serviceName; - PathChildrenCache pathChildrenCache = new PathChildrenCache(CuratorUtils.zkClient, servicePath, true); + PathChildrenCache pathChildrenCache = new PathChildrenCache(zkClient, servicePath, true); PathChildrenCacheListener pathChildrenCacheListener = (curatorFramework, pathChildrenCacheEvent) -> { List serviceAddresses = curatorFramework.getChildren().forPath(servicePath); - serviceAddressMap.put(serviceName, serviceAddresses); + SERVICE_ADDRESS_MAP.put(serviceName, serviceAddresses); }; pathChildrenCache.getListenable().addListener(pathChildrenCacheListener); try { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java index 620acff..d4146fe 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcMessageChecker.java @@ -1,7 +1,7 @@ package github.javaguide.remoting.dto; -import github.javaguide.enumeration.RpcErrorMessageEnum; +import github.javaguide.enumeration.RpcErrorMessage; import github.javaguide.enumeration.RpcResponseCode; import github.javaguide.exception.RpcException; import lombok.extern.slf4j.Slf4j; @@ -21,15 +21,15 @@ public final class RpcMessageChecker { public static void check(RpcResponse rpcResponse, RpcRequest rpcRequest) { if (rpcResponse == null) { - throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); } if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) { - throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + throw new RpcException(RpcErrorMessage.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); } if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) { - throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); + throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName()); } } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java index ae833a4..a3a7781 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/dto/RpcRequest.java @@ -1,6 +1,6 @@ package github.javaguide.remoting.dto; -import github.javaguide.enumeration.RpcMessageTypeEnum; +import github.javaguide.enumeration.RpcMessageType; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; @@ -25,5 +25,5 @@ public class RpcRequest implements Serializable { private String methodName; private Object[] parameters; private Class[] paramTypes; - private RpcMessageTypeEnum rpcMessageTypeEnum; + private RpcMessageType rpcMessageType; } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/handler/RpcRequestHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/handler/RpcRequestHandler.java index fe70e4e..8dc1817 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/handler/RpcRequestHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/handler/RpcRequestHandler.java @@ -1,5 +1,6 @@ package github.javaguide.remoting.handler; +import github.javaguide.factory.SingletonFactory; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.enumeration.RpcResponseCode; @@ -19,7 +20,11 @@ import java.lang.reflect.Method; */ @Slf4j public class RpcRequestHandler { - private static ServiceProvider serviceProvider = new ServiceProviderImpl(); + private final ServiceProvider serviceProvider; + + public RpcRequestHandler() { + serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class); + } /** * 处理 rpcRequest :调用对应的方法,然后返回方法执行结果 diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/ChannelProvider.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/ChannelProvider.java index cf31e5a..ab54bf0 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/ChannelProvider.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/ChannelProvider.java @@ -15,36 +15,37 @@ import java.util.concurrent.ConcurrentHashMap; * @createTime 2020年05月29日 16:36:00 */ @Slf4j -public final class ChannelProvider { +public class ChannelProvider { - private static final Map channels = new ConcurrentHashMap<>(); - private static final NettyClient nettyClient = SingletonFactory.getInstance(NettyClient.class); - - private ChannelProvider() { + private final Map channelMap; + private final NettyClient nettyClient; + public ChannelProvider() { + channelMap = new ConcurrentHashMap<>(); + nettyClient = SingletonFactory.getInstance(NettyClient.class); } - public static Channel get(InetSocketAddress inetSocketAddress) { + public Channel get(InetSocketAddress inetSocketAddress) { String key = inetSocketAddress.toString(); // determine if there is a connection for the corresponding address - if (channels.containsKey(key)) { - Channel channel = channels.get(key); + if (channelMap.containsKey(key)) { + Channel channel = channelMap.get(key); // if so, determine if the connection is available, and if so, get it directly if (channel != null && channel.isActive()) { return channel; } else { - channels.remove(key); + channelMap.remove(key); } } // otherwise, reconnect to get the Channel Channel channel = nettyClient.doConnect(inetSocketAddress); - channels.put(key, channel); + channelMap.put(key, channel); return channel; } - public static void remove(InetSocketAddress inetSocketAddress) { + public void remove(InetSocketAddress inetSocketAddress) { String key = inetSocketAddress.toString(); - channels.remove(key); - log.info("Channel map size :[{}]", channels.size()); + channelMap.remove(key); + log.info("Channel map size :[{}]", channelMap.size()); } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java index 9c7b612..d987f37 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClient.java @@ -32,11 +32,11 @@ import java.util.concurrent.TimeUnit; */ @Slf4j public final class NettyClient { - private static final Bootstrap bootstrap; - private static final EventLoopGroup eventLoopGroup; + private final Bootstrap bootstrap; + private final EventLoopGroup eventLoopGroup; // initialize resources such as EventLoopGroup, Bootstrap - static { + public NettyClient() { eventLoopGroup = new NioEventLoopGroup(); bootstrap = new Bootstrap(); KryoSerializer kryoSerializer = new KryoSerializer(); @@ -63,6 +63,7 @@ public final class NettyClient { }); } + /** * connect server and get the channel ,so that you can send rpc message to server * diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java index 6cc041e..c7dac8a 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientHandler.java @@ -1,6 +1,6 @@ package github.javaguide.remoting.transport.netty.client; -import github.javaguide.enumeration.RpcMessageTypeEnum; +import github.javaguide.enumeration.RpcMessageType; import github.javaguide.factory.SingletonFactory; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; @@ -17,7 +17,7 @@ import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; /** - * 自定义客户端 ChannelHandler 来处理服务端发过来的数据 + * Customize the client ChannelHandler to process the data sent by the server * *

* 如果继承自 SimpleChannelInboundHandler 的话就不要考虑 ByteBuf 的释放 ,{@link SimpleChannelInboundHandler} 内部的 @@ -29,21 +29,24 @@ import java.net.InetSocketAddress; @Slf4j public class NettyClientHandler extends ChannelInboundHandlerAdapter { private final UnprocessedRequests unprocessedRequests; + private final ChannelProvider channelProvider; public NettyClientHandler() { this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class); + this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class); } /** - * 读取服务端传输的消息 + * Read the message transmitted by the server */ - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { try { log.info("client receive msg: [{}]", msg); - RpcResponse rpcResponse = (RpcResponse) msg; - unprocessedRequests.complete(rpcResponse); + if (msg instanceof RpcResponse) { + RpcResponse rpcResponse = (RpcResponse) msg; + unprocessedRequests.complete(rpcResponse); + } } finally { ReferenceCountUtil.release(msg); } @@ -55,8 +58,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { log.info("write idle happen [{}]", ctx.channel().remoteAddress()); - Channel channel = ChannelProvider.get((InetSocketAddress) ctx.channel().remoteAddress()); - RpcRequest rpcRequest = RpcRequest.builder().rpcMessageTypeEnum(RpcMessageTypeEnum.HEART_BEAT).build(); + Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress()); + RpcRequest rpcRequest = RpcRequest.builder().rpcMessageType(RpcMessageType.HEART_BEAT).build(); channel.writeAndFlush(rpcRequest).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } } else { @@ -65,7 +68,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { } /** - * 处理客户端消息发生异常的时候被调用 + * Called when an exception occurs in processing a client message */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java index 4279fa9..cde3aac 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/NettyClientTransport.java @@ -2,7 +2,7 @@ package github.javaguide.remoting.transport.netty.client; import github.javaguide.factory.SingletonFactory; import github.javaguide.registry.ServiceDiscovery; -import github.javaguide.registry.ZkServiceDiscovery; +import github.javaguide.registry.zk.ZkServiceDiscovery; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.remoting.transport.ClientTransport; @@ -14,7 +14,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; /** - * 基于 Netty 传输 RpcRequest。 + * transport rpcRequest based on netty. * * @author shuang.kou * @createTime 2020年05月29日 11:34:00 @@ -23,20 +23,22 @@ import java.util.concurrent.CompletableFuture; public class NettyClientTransport implements ClientTransport { private final ServiceDiscovery serviceDiscovery; private final UnprocessedRequests unprocessedRequests; + private final ChannelProvider channelProvider; public NettyClientTransport() { this.serviceDiscovery = new ZkServiceDiscovery(); this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class); + this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class); } @Override - public CompletableFuture sendRpcRequest(RpcRequest rpcRequest) { - // 构建返回值 - CompletableFuture resultFuture = new CompletableFuture<>(); + public CompletableFuture> sendRpcRequest(RpcRequest rpcRequest) { + // build return value + CompletableFuture> resultFuture = new CompletableFuture<>(); InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName()); - Channel channel = ChannelProvider.get(inetSocketAddress); + Channel channel = channelProvider.get(inetSocketAddress); if (channel != null && channel.isActive()) { - // 放入未处理的请求 + // put unprocessed request unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture); channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/UnprocessedRequests.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/UnprocessedRequests.java index 2416793..2bee2cc 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/UnprocessedRequests.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/client/UnprocessedRequests.java @@ -7,20 +7,20 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; /** - * 未处理的请求。 + * unprocessed requests by the server. * * @author shuang.kou * @createTime 2020年06月04日 17:30:00 */ public class UnprocessedRequests { - private static Map> unprocessedResponseFutures = new ConcurrentHashMap<>(); + private static final Map>> UNPROCESSED_RESPONSE_FUTURES = new ConcurrentHashMap<>(); - public void put(String requestId, CompletableFuture future) { - unprocessedResponseFutures.put(requestId, future); + public void put(String requestId, CompletableFuture> future) { + UNPROCESSED_RESPONSE_FUTURES.put(requestId, future); } - public void complete(RpcResponse rpcResponse) { - CompletableFuture future = unprocessedResponseFutures.remove(rpcResponse.getRequestId()); + public void complete(RpcResponse rpcResponse) { + CompletableFuture> future = UNPROCESSED_RESPONSE_FUTURES.remove(rpcResponse.getRequestId()); if (null != future) { future.complete(rpcResponse); } else { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoDecoder.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoDecoder.java index 817fe3e..cf65589 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoDecoder.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoDecoder.java @@ -19,8 +19,8 @@ import java.util.List; @Slf4j public class NettyKryoDecoder extends ByteToMessageDecoder { - private Serializer serializer; - private Class genericClass; + private final Serializer serializer; + private final Class genericClass; /** * Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部 diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoEncoder.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoEncoder.java index 20cec1a..4dc1843 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoEncoder.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/codec/kyro/NettyKryoEncoder.java @@ -16,8 +16,8 @@ import lombok.AllArgsConstructor; */ @AllArgsConstructor public class NettyKryoEncoder extends MessageToByteEncoder { - private Serializer serializer; - private Class genericClass; + private final Serializer serializer; + private final Class genericClass; /** * 将对象转换为字节码然后写入到 ByteBuf 对象中 diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java index 7a84d41..71b6e24 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServer.java @@ -1,11 +1,6 @@ package github.javaguide.remoting.transport.netty.server; -import github.javaguide.annotation.RpcService; import github.javaguide.config.CustomShutdownHook; -import github.javaguide.provider.ServiceProvider; -import github.javaguide.provider.ServiceProviderImpl; -import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; import github.javaguide.remoting.transport.netty.codec.kyro.NettyKryoDecoder; @@ -22,17 +17,12 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.BeansException; import org.springframework.beans.factory.InitializingBean; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.ApplicationContext; -import org.springframework.context.ApplicationContextAware; -import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; -import java.net.InetSocketAddress; -import java.util.Map; +import java.net.InetAddress; import java.util.concurrent.TimeUnit; /** @@ -44,23 +34,14 @@ import java.util.concurrent.TimeUnit; */ @Slf4j @Component -@PropertySource("classpath:rpc.properties") -public class NettyServer implements InitializingBean, ApplicationContextAware { - @Value("${rpc.server.host}") - private String host; - @Value("${rpc.server.port}") - private int port; +public class NettyServer implements InitializingBean { private final KryoSerializer kryoSerializer = new KryoSerializer(); - private final ServiceRegistry serviceRegistry = new ZkServiceRegistry(); - private final ServiceProvider serviceProvider = new ServiceProviderImpl(); - - public void publishService(Object service, Class serviceClass) { - serviceProvider.addServiceProvider(service, serviceClass); - serviceRegistry.registerService(serviceClass.getCanonicalName(), new InetSocketAddress(host, port)); - } + public static final int PORT = 9998; + @SneakyThrows public void start() { + String host = InetAddress.getLocalHost().getHostAddress(); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { @@ -87,7 +68,7 @@ public class NettyServer implements InitializingBean, ApplicationContextAware { }); // 绑定端口,同步等待绑定成功 - ChannelFuture f = b.bind(host, port).sync(); + ChannelFuture f = b.bind(host, PORT).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } catch (InterruptedException e) { @@ -107,10 +88,4 @@ public class NettyServer implements InitializingBean, ApplicationContextAware { CustomShutdownHook.getCustomShutdownHook().clearAll(); } - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - // 获得所有被 RpcService 注解的类 - Map registeredBeanMap = applicationContext.getBeansWithAnnotation(RpcService.class); - registeredBeanMap.values().forEach(o -> publishService(o, o.getClass().getInterfaces()[0])); - } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java index 8f35d87..22378fd 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java @@ -1,6 +1,6 @@ package github.javaguide.remoting.transport.netty.server; -import github.javaguide.enumeration.RpcMessageTypeEnum; +import github.javaguide.enumeration.RpcMessageType; import github.javaguide.factory.SingletonFactory; import github.javaguide.remoting.handler.RpcRequestHandler; import github.javaguide.remoting.dto.RpcRequest; @@ -37,7 +37,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter { try { log.info("server receive msg: [{}] ", msg); RpcRequest rpcRequest = (RpcRequest) msg; - if (rpcRequest.getRpcMessageTypeEnum() == RpcMessageTypeEnum.HEART_BEAT) { + if (rpcRequest.getRpcMessageType() == RpcMessageType.HEART_BEAT) { log.info("receive heat beat msg from client"); return; } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/SpringBeanPostProcessor.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/SpringBeanPostProcessor.java new file mode 100644 index 0000000..105591e --- /dev/null +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/SpringBeanPostProcessor.java @@ -0,0 +1,40 @@ +package github.javaguide.remoting.transport.netty.server; + +import github.javaguide.annotation.RpcService; +import github.javaguide.factory.SingletonFactory; +import github.javaguide.provider.ServiceProvider; +import github.javaguide.provider.ServiceProviderImpl; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.stereotype.Component; + +/** + * call this method before creating the bean to see if the class is annotated + * + * @author shuang.kou + * @createTime 2020年07月14日 16:42:00 + */ +@Component +@Slf4j +public class SpringBeanPostProcessor implements BeanPostProcessor { + + + private final ServiceProvider serviceProvider; + + public SpringBeanPostProcessor() { + serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class); + } + + @SneakyThrows + @Override + public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { + if (bean.getClass().isAnnotationPresent(RpcService.class)) { + log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName()); + serviceProvider.publishService(bean); + } + return bean; + } + +} diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java index 43128f3..3d1fd0a 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcClient.java @@ -2,7 +2,7 @@ package github.javaguide.remoting.transport.socket; import github.javaguide.exception.RpcException; import github.javaguide.registry.ServiceDiscovery; -import github.javaguide.registry.ZkServiceDiscovery; +import github.javaguide.registry.zk.ZkServiceDiscovery; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.transport.ClientTransport; import lombok.AllArgsConstructor; diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java index 1d9fd03..35bdf9e 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcRequestHandlerRunnable.java @@ -17,8 +17,8 @@ import java.net.Socket; */ @Slf4j public class SocketRpcRequestHandlerRunnable implements Runnable { - private Socket socket; - private RpcRequestHandler rpcRequestHandler; + private final Socket socket; + private final RpcRequestHandler rpcRequestHandler; public SocketRpcRequestHandlerRunnable(Socket socket) { diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java index 5259a2a..a0f87f5 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/socket/SocketRpcServer.java @@ -4,7 +4,7 @@ import github.javaguide.config.CustomShutdownHook; import github.javaguide.provider.ServiceProvider; import github.javaguide.provider.ServiceProviderImpl; import github.javaguide.registry.ServiceRegistry; -import github.javaguide.registry.ZkServiceRegistry; +import github.javaguide.registry.zk.ZkServiceRegistry; import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils; import lombok.extern.slf4j.Slf4j; diff --git a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java index 08adaa8..a7507df 100644 --- a/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java +++ b/rpc-framework-simple/src/test/java/github/javaguide/registry/ZkServiceRegistryTest.java @@ -1,5 +1,7 @@ package github.javaguide.registry; +import github.javaguide.registry.zk.ZkServiceDiscovery; +import github.javaguide.registry.zk.ZkServiceRegistry; import org.junit.jupiter.api.Test; import java.net.InetSocketAddress; @@ -17,9 +19,9 @@ class ZkServiceRegistryTest { void should_register_service_successful_and_lookup_service_by_service_name() { ServiceRegistry zkServiceRegistry = new ZkServiceRegistry(); InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333); - zkServiceRegistry.registerService("github.javaguide.registry.ZkServiceRegistry", givenInetSocketAddress); + zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress); ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery(); - InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.ZkServiceRegistry"); + InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.zk.ZkServiceRegistry"); assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString()); } } -- GitLab