提交 df21fdea 编写于 作者: S shuang.kou

[v2.0]refractor create a threadpool util

上级 a6028b87
......@@ -3,6 +3,7 @@ package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
......@@ -11,6 +12,10 @@ import org.slf4j.LoggerFactory;
/**
* 自定义客户端 ChannelHandler 来处理服务端发过来的数据
*
* <p>
* 如果继承自 SimpleChannelInboundHandler 的话就不要考虑 ByteBuf 的释放 ,{@link SimpleChannelInboundHandler} 内部的
* channelRead 方法会替你释放 ByteBuf ,避免可能导致的内存泄露问题。详见《Netty进阶之路 跟着案例学 Netty》
*
* @author shuang.kou
* @createTime 2020年05月25日 20:50:00
*/
......
......@@ -20,6 +20,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 客户端。发送消息到服务端,并接收服务端返回的方法执行结果
*
* @author shuang.kou
* @createTime 2020年05月25日 16:43:00
*/
......
......@@ -19,6 +19,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 服务端。接收客户端消息,并且根据客户端的消息调用相应的方法,然后返回结果给客户端。
*
* @author shuang.kou
* @createTime 2020年05月25日 16:42:00
*/
......
......@@ -5,16 +5,24 @@ import github.javaguide.dto.RpcResponse;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.transport.RpcRequestHandler;
import github.javaguide.utils.concurrent.ThreadPoolFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.jvm.hotspot.debugger.Page;
import java.util.concurrent.ExecutorService;
/**
* 自定义服务端的 ChannelHandler 来处理客户端发过来的数据
* 自定义服务端的 ChannelHandler 来处理客户端发过来的数据。
* <p>
* 如果继承自 SimpleChannelInboundHandler 的话就不要考虑 ByteBuf 的释放 ,{@link SimpleChannelInboundHandler} 内部的
* channelRead 方法会替你释放 ByteBuf ,避免可能导致的内存泄露问题。详见《Netty进阶之路 跟着案例学 Netty》
*
* @author shuang.kou
* @createTime 2020年05月25日 20:44:00
......@@ -24,26 +32,36 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RpcRequestHandler rpcRequestHandler;
private static ServiceRegistry serviceRegistry;
private static ExecutorService threadPool;
static {
rpcRequestHandler = new RpcRequestHandler();
serviceRegistry = new DefaultServiceRegistry();
threadPool = ThreadPoolFactory.createDefaultThreadPool("netty-server-handler-rpc-pool");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcRequest rpcRequest = (RpcRequest) msg;
logger.info(String.format("server receive msg: %s", rpcRequest));
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = rpcRequestHandler.handle(rpcRequest, service);
logger.info(String.format("server get result: %s", result.toString()));
ChannelFuture f = ctx.writeAndFlush(RpcResponse.success(result));
f.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
threadPool.execute(() -> {
logger.info(String.format("server handle message from client by thread: %s", Thread.currentThread().getName()));
try {
logger.info(String.format("server receive msg: %s", msg));
RpcRequest rpcRequest = (RpcRequest) msg;
String interfaceName = rpcRequest.getInterfaceName();
//通过注册中心获取到目标类(客户端需要调用类)
Object service = serviceRegistry.getService(interfaceName);
//执行目标方法(客户端需要执行的方法)并且返回方法结果
Object result = rpcRequestHandler.handle(rpcRequest, service);
logger.info(String.format("server get result: %s", result.toString()));
//返回方法执行结果给客户端
ChannelFuture f = ctx.writeAndFlush(RpcResponse.success(result));
f.addListener(ChannelFutureListener.CLOSE);
} finally {
//确保 ByteBuf 被释放,不然可能会有内存泄露问题
ReferenceCountUtil.release(msg);
}
});
}
@Override
......
......@@ -33,6 +33,7 @@ public class SocketRpcRequestHandlerRunnable implements Runnable {
@Override
public void run() {
logger.info(String.format("server handle message from client by thread: %s", Thread.currentThread().getName()));
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
......
package github.javaguide.transport.socket;
import github.javaguide.utils.concurrent.ThreadPoolFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author shuang.kou
* @createTime 2020年05月10日 08:01:00
*/
public class SocketRpcServer {
/**
* 线程池参数
*/
private static final int CORE_POOL_SIZE = 10;
private static final int MAXIMUM_POOL_SIZE_SIZE = 100;
private static final int KEEP_ALIVE_TIME = 1;
private static final int BLOCKING_QUEUE_CAPACITY = 100;
private ExecutorService threadPool;
private static final Logger logger = LoggerFactory.getLogger(SocketRpcServer.class);
public SocketRpcServer() {
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory();
this.threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, workQueue, threadFactory);
threadPool = ThreadPoolFactory.createDefaultThreadPool("socket-server-rpc-pool");
}
public void start(int port) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册