diff --git a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java index 4ba4f7e516a3b86e18059344601338d897f021d5..1786f2c345ce376bea0866714fdfd347bd829f62 100644 --- a/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java +++ b/rpc-framework-common/src/main/java/github/javaguide/utils/concurrent/threadpool/ThreadPoolFactoryUtils.java @@ -7,6 +7,8 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -64,7 +66,7 @@ public final class ThreadPoolFactoryUtils { log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated()); try { executorService.awaitTermination(10, TimeUnit.SECONDS); - } catch (InterruptedException ie) { + } catch (InterruptedException e) { log.error("Thread pool never terminated"); executorService.shutdownNow(); } @@ -98,4 +100,20 @@ public final class ThreadPoolFactoryUtils { return Executors.defaultThreadFactory(); } + /** + * 打印线程池的状态 + * + * @param threadPool 线程池对象 + */ + public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) { + ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false)); + scheduledExecutorService.scheduleAtFixedRate(() -> { + log.info("============ThreadPool Status============="); + log.info("ThreadPool Size: [{}]", threadPool.getPoolSize()); + log.info("Active Threads: [{}]", threadPool.getActiveCount()); + log.info("Number of Tasks : [{}]", threadPool.getCompletedTaskCount()); + log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size()); + log.info("==========================================="); + }, 0, 1, TimeUnit.SECONDS); + } } diff --git a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java index 7f0b2f9879da2c94dac7a8f9dd3d1636bd7aacdd..b09cbc185ea0c8b55fe17e84d20a6d82e88f6d2e 100644 --- a/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java +++ b/rpc-framework-simple/src/main/java/github/javaguide/remoting/transport/netty/server/NettyServerHandler.java @@ -4,8 +4,6 @@ import github.javaguide.factory.SingletonFactory; import github.javaguide.handler.RpcRequestHandler; import github.javaguide.remoting.dto.RpcRequest; import github.javaguide.remoting.dto.RpcResponse; -import github.javaguide.utils.concurrent.threadpool.CustomThreadPoolConfig; -import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -13,8 +11,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.ReferenceCountUtil; import lombok.extern.slf4j.Slf4j; -import java.util.concurrent.ExecutorService; - /** * 自定义服务端的 ChannelHandler 来处理客户端发过来的数据。 *

@@ -27,39 +23,31 @@ import java.util.concurrent.ExecutorService; @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { - private static final String THREAD_NAME_PREFIX = "netty-server-handler-rpc-pool"; private final RpcRequestHandler rpcRequestHandler; - private final ExecutorService threadPool; public NettyServerHandler() { this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class); - CustomThreadPoolConfig customThreadPoolConfig = new CustomThreadPoolConfig(); - customThreadPoolConfig.setCorePoolSize(6); - this.threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent(THREAD_NAME_PREFIX, customThreadPoolConfig); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - threadPool.execute(() -> { - try { - log.info("server receive msg: [{}] ", msg); - RpcRequest rpcRequest = (RpcRequest) msg; - //执行目标方法(客户端需要执行的方法)并且返回方法结果 - Object result = rpcRequestHandler.handle(rpcRequest); - log.info(String.format("server get result: %s", result.toString())); - if (ctx.channel().isActive() && ctx.channel().isWritable()) { - //返回方法执行结果给客户端 - RpcResponse rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId()); - ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); - } else { - log.error("not writable now, message dropped"); - } - } finally { - //确保 ByteBuf 被释放,不然可能会有内存泄露问题 - ReferenceCountUtil.release(msg); + try { + log.info("server receive msg: [{}] ", msg); + RpcRequest rpcRequest = (RpcRequest) msg; + //执行目标方法(客户端需要执行的方法)并且返回方法结果 + Object result = rpcRequestHandler.handle(rpcRequest); + log.info(String.format("server get result: %s", result.toString())); + if (ctx.channel().isActive() && ctx.channel().isWritable()) { + //返回方法执行结果给客户端 + RpcResponse rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId()); + ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + } else { + log.error("not writable now, message dropped"); } - }); - + } finally { + //确保 ByteBuf 被释放,不然可能会有内存泄露问题 + ReferenceCountUtil.release(msg); + } } @Override