提交 58df5467 编写于 作者: S shuang.kou

[refractor]remove threadpool in NettyServerHandler

上级 405faedb
......@@ -7,6 +7,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
......@@ -64,7 +66,7 @@ public final class ThreadPoolFactoryUtils {
log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated());
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
} catch (InterruptedException e) {
log.error("Thread pool never terminated");
executorService.shutdownNow();
}
......@@ -98,4 +100,20 @@ public final class ThreadPoolFactoryUtils {
return Executors.defaultThreadFactory();
}
/**
* 打印线程池的状态
*
* @param threadPool 线程池对象
*/
public static void printThreadPoolStatus(ThreadPoolExecutor threadPool) {
ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, createThreadFactory("print-thread-pool-status", false));
scheduledExecutorService.scheduleAtFixedRate(() -> {
log.info("============ThreadPool Status=============");
log.info("ThreadPool Size: [{}]", threadPool.getPoolSize());
log.info("Active Threads: [{}]", threadPool.getActiveCount());
log.info("Number of Tasks : [{}]", threadPool.getCompletedTaskCount());
log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
log.info("===========================================");
}, 0, 1, TimeUnit.SECONDS);
}
}
......@@ -4,8 +4,6 @@ import github.javaguide.factory.SingletonFactory;
import github.javaguide.handler.RpcRequestHandler;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.utils.concurrent.threadpool.CustomThreadPoolConfig;
import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
......@@ -13,8 +11,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
/**
* 自定义服务端的 ChannelHandler 来处理客户端发过来的数据。
* <p>
......@@ -27,39 +23,31 @@ import java.util.concurrent.ExecutorService;
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final String THREAD_NAME_PREFIX = "netty-server-handler-rpc-pool";
private final RpcRequestHandler rpcRequestHandler;
private final ExecutorService threadPool;
public NettyServerHandler() {
this.rpcRequestHandler = SingletonFactory.getInstance(RpcRequestHandler.class);
CustomThreadPoolConfig customThreadPoolConfig = new CustomThreadPoolConfig();
customThreadPoolConfig.setCorePoolSize(6);
this.threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent(THREAD_NAME_PREFIX, customThreadPoolConfig);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
threadPool.execute(() -> {
try {
log.info("server receive msg: [{}] ", msg);
RpcRequest rpcRequest = (RpcRequest) msg;
//执行目标方法(客户端需要执行的方法)并且返回方法结果
Object result = rpcRequestHandler.handle(rpcRequest);
log.info(String.format("server get result: %s", result.toString()));
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
//返回方法执行结果给客户端
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
log.error("not writable now, message dropped");
}
} finally {
//确保 ByteBuf 被释放,不然可能会有内存泄露问题
ReferenceCountUtil.release(msg);
try {
log.info("server receive msg: [{}] ", msg);
RpcRequest rpcRequest = (RpcRequest) msg;
//执行目标方法(客户端需要执行的方法)并且返回方法结果
Object result = rpcRequestHandler.handle(rpcRequest);
log.info(String.format("server get result: %s", result.toString()));
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
//返回方法执行结果给客户端
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
ctx.writeAndFlush(rpcResponse).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
log.error("not writable now, message dropped");
}
});
} finally {
//确保 ByteBuf 被释放,不然可能会有内存泄露问题
ReferenceCountUtil.release(msg);
}
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册