提交 7090a428 编写于 作者: S shuang.kou

[v3.0]add CustomShutdownHook used for do something when system shutdown

上级 467b2d8d
......@@ -12,6 +12,7 @@ import org.apache.zookeeper.CreateMode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
......@@ -25,6 +26,7 @@ public final class CuratorUtils {
private static final String CONNECT_STRING = "127.0.0.1:2181";
public static final String ZK_REGISTER_ROOT_PATH = "/my-rpc";
private static Map<String, List<String>> serviceAddressMap = new ConcurrentHashMap<>();
private static Set<String> registeredPathSet = ConcurrentHashMap.newKeySet();
private static CuratorFramework zkClient;
static {
......@@ -41,13 +43,14 @@ public final class CuratorUtils {
*/
public static void createPersistentNode(String path) {
try {
if (zkClient.checkExists().forPath(path) == null) {
if (registeredPathSet.contains(path) || zkClient.checkExists().forPath(path) != null) {
log.info("节点已经存在,节点为:[{}]", path);
} else {
//eg: /my-rpc/github.javaguide.HelloService/127.0.0.1:9999
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);
log.info("节点创建成功,节点为:[{}]", path);
} else {
log.info("节点已经存在,节点为:[{}]", path);
}
registeredPathSet.add(path);
} catch (Exception e) {
throw new RpcException(e.getMessage(), e.getCause());
}
......@@ -75,6 +78,20 @@ public final class CuratorUtils {
return result;
}
/**
* 清空注册中心的数据
*/
public static void clearRegistry() {
registeredPathSet.stream().parallel().forEach(p -> {
try {
zkClient.delete().forPath(p);
} catch (Exception e) {
throw new RpcException(e.getMessage(), e.getCause());
}
});
log.info("服务端(Provider)所有注册的服务都被清空:[{}]", registeredPathSet.toString());
}
private static CuratorFramework getZkClient() {
// 重试策略。重试3次,并且会增加重试之间的睡眠时间。
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME, MAX_RETRIES);
......
package github.javaguide.config;
import github.javaguide.utils.concurrent.ThreadPoolFactoryUtils;
import github.javaguide.utils.zk.CuratorUtils;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
/**
* 当服务端(provider)关闭的时候做一些事情比如取消注册所有服务
*
* @author shuang.kou
* @createTime 2020年06月04日 13:11:00
*/
@Slf4j
public class CustomShutdownHook {
private final ExecutorService threadPool = ThreadPoolFactoryUtils.createDefaultThreadPool("custom-shutdown-hook-rpc-pool");
private static final CustomShutdownHook CUSTOM_SHUTDOWN_HOOK = new CustomShutdownHook();
public static CustomShutdownHook getCustomShutdownHook() {
return CUSTOM_SHUTDOWN_HOOK;
}
public void clearAll() {
log.info("addShutdownHook for clearAll");
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
CuratorUtils.clearRegistry();
threadPool.shutdown();
}));
}
}
......@@ -17,7 +17,7 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
@Override
public InetSocketAddress lookupService(String serviceName) {
// TODO(shuang.kou):feat: 负载均衡
// 这里直接去了第一个找到的服务地址,eg:127.0.0.1:99990000000017
// 这里直接去了第一个找到的服务地址,eg:127.0.0.1:9999
String serviceAddress = CuratorUtils.getChildrenNodes(serviceName).get(0);
log.info("成功找到服务地址:{}", serviceAddress);
String[] socketAddressArray = serviceAddress.split(":");
......
package github.javaguide.remoting.transport.netty.server;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.config.CustomShutdownHook;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.netty.codec.kyro.NettyKryoDecoder;
import github.javaguide.remoting.transport.netty.codec.kyro.NettyKryoEncoder;
import github.javaguide.serialize.kyro.KryoSerializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
......@@ -76,6 +77,7 @@ public class NettyServer {
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(host, port).sync();
CustomShutdownHook.getCustomShutdownHook().clearAll();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
......
package github.javaguide.remoting.transport.socket;
import github.javaguide.config.CustomShutdownHook;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.registry.ServiceRegistry;
......@@ -44,10 +45,10 @@ public class SocketRpcServer {
private void start() {
try (ServerSocket server = new ServerSocket()) {
server.bind(new InetSocketAddress(host, port));
log.info("server starts...");
CustomShutdownHook.getCustomShutdownHook().clearAll();
Socket socket;
while ((socket = server.accept()) != null) {
log.info("client connected");
log.info("client connected [{}]", socket.getInetAddress());
threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
}
threadPool.shutdown();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册