提交 a6028b87 编写于 作者: S shuang.kou

[v2.0]refractor optimize package and add perfect comment

上级 f256704b
# 最简易版本的 RPC 框架 # 最简易版本的 RPC 框架
> 为了能让更多人能够看懂框架源码,我添加了很详细的注释。
>
> 实际项目上实际是不推荐这样的!不需要注释的地方不要添加注释,能用代码表达意思的地方就不要用注释来说。
这个包是最简易版本的 RPC 框架的实现,主要使用: 这个包是最简易版本的 RPC 框架的实现,主要使用:
1. 1.
...@@ -10,6 +10,8 @@ import java.util.Set; ...@@ -10,6 +10,8 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
/** /**
* 默认的服务注册中心实现,通过 Map 保存服务信息,可以通过 zookeeper 来改进
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 11:23:00 * @createTime 2020年05月13日 11:23:00
*/ */
......
package github.javaguide.registry; package github.javaguide.registry;
/** /**
* 服务注册中心接口
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 08:39:00 * @createTime 2020年05月13日 08:39:00
*/ */
......
package github.javaguide.serialize; package github.javaguide.serialize;
/** /**
* 序列化接口,所有序列化类都要实现这个接口
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 19:29:00 * @createTime 2020年05月13日 19:29:00
*/ */
...@@ -17,7 +19,7 @@ public interface Serializer { ...@@ -17,7 +19,7 @@ public interface Serializer {
* 反序列化 * 反序列化
* *
* @param bytes 序列化后的字节数组 * @param bytes 序列化后的字节数组
* @param clazz 类 * @param clazz 目标
* @param <T> * @param <T>
* @return 反序列化的对象 * @return 反序列化的对象
*/ */
......
...@@ -7,7 +7,6 @@ import github.javaguide.dto.RpcRequest; ...@@ -7,7 +7,6 @@ import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.exception.SerializeException; import github.javaguide.exception.SerializeException;
import github.javaguide.serialize.Serializer; import github.javaguide.serialize.Serializer;
import github.javaguide.transport.netty.NettyClientHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -15,6 +14,8 @@ import java.io.ByteArrayInputStream; ...@@ -15,6 +14,8 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
/** /**
* Kryo序列化类,Kryo序列化效率很高,但是只兼容 Java 语言
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 19:29:00 * @createTime 2020年05月13日 19:29:00
*/ */
...@@ -25,7 +26,7 @@ public class KryoSerializer implements Serializer { ...@@ -25,7 +26,7 @@ public class KryoSerializer implements Serializer {
* 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。 * 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
* 所以,使用 ThreadLocal 存放 Kryo 对象 * 所以,使用 ThreadLocal 存放 Kryo 对象
*/ */
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> { private final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo(); Kryo kryo = new Kryo();
kryo.register(RpcResponse.class); kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class); kryo.register(RpcRequest.class);
......
package github.javaguide.transport.netty; package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
...@@ -9,21 +9,29 @@ import org.slf4j.Logger; ...@@ -9,21 +9,29 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* 自定义客户端 ChannelHandler 来处理服务端发过来的数据
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 20:50:00 * @createTime 2020年05月25日 20:50:00
*/ */
public class NettyClientHandler extends ChannelInboundHandlerAdapter { public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
/**
* 读取服务端传输的消息
*/
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
try { try {
logger.info(String.format("client receive msg: %s", msg));
RpcResponse rpcResponse = (RpcResponse) msg; RpcResponse rpcResponse = (RpcResponse) msg;
logger.info(String.format("client receive msg: %s", rpcResponse)); // 声明一个 AttributeKey 对象,类似于 Map 中的 key
// 声明一个 AttributeKey 对象
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse"); AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
// 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源 /*
// AttributeMap的key是AttributeKey,value是Attribute * AttributeMap 可以看作是一个Channel的共享数据源
* AttributeMap 的 key 是 AttributeKey,value 是 Attribute
*/
// 将服务端的返回结果保存到 AttributeMap 上
ctx.channel().attr(key).set(rpcResponse); ctx.channel().attr(key).set(rpcResponse);
ctx.channel().close(); ctx.channel().close();
} finally { } finally {
...@@ -31,9 +39,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -31,9 +39,12 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
} }
} }
/**
* 处理客户端消息发生异常的时候被调用
*/
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("client catch exception"); logger.error("client catch exception:", cause);
cause.printStackTrace(); cause.printStackTrace();
ctx.close(); ctx.close();
} }
......
package github.javaguide.transport.netty; package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.serialize.kyro.KryoSerializer; import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.RpcClient; import github.javaguide.transport.RpcClient;
import github.javaguide.transport.netty.codec.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.NettyKryoEncoder;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
......
package github.javaguide.transport.netty; package github.javaguide.transport.netty.codec;
import github.javaguide.serialize.Serializer; import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
...@@ -11,8 +11,10 @@ import org.slf4j.LoggerFactory; ...@@ -11,8 +11,10 @@ import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
/** /**
* 自定义解码器。负责处理"入站"消息,将消息格式转换为我们需要的业务对象
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 19:42:00 * @createTime 2020年05月25日 19:42:00
*/ */
@AllArgsConstructor @AllArgsConstructor
public class NettyKryoDecoder extends ByteToMessageDecoder { public class NettyKryoDecoder extends ByteToMessageDecoder {
...@@ -26,31 +28,40 @@ public class NettyKryoDecoder extends ByteToMessageDecoder { ...@@ -26,31 +28,40 @@ public class NettyKryoDecoder extends ByteToMessageDecoder {
*/ */
private static final int BODY_LENGTH = 4; private static final int BODY_LENGTH = 4;
/**
* 解码 ByteBuf 对象
*
* @param ctx 解码器关联的 ChannelHandlerContext 对象
* @param in "入站"数据,也就是 ByteBuf 对象
* @param out 解码之后的数据对象需要添加到 out 对象里面
*/
@Override @Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) { protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4, //1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
if (byteBuf.readableBytes() >= BODY_LENGTH) { if (in.readableBytes() >= BODY_LENGTH) {
//2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用 //2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
byteBuf.markReaderIndex(); in.markReaderIndex();
//3.读取消息的长度 //3.读取消息的长度
//注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法 //注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
int dataLength = byteBuf.readInt(); int dataLength = in.readInt();
//4.遇到不合理的情况直接 return //4.遇到不合理的情况直接 return
if (dataLength < 0 || byteBuf.readableBytes() < 0) { if (dataLength < 0 || in.readableBytes() < 0) {
logger.error("data length or byteBuf readableBytes is not valid");
return; return;
} }
//5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex //5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
if (byteBuf.readableBytes() < dataLength) { if (in.readableBytes() < dataLength) {
byteBuf.resetReaderIndex(); in.resetReaderIndex();
return; return;
} }
// 6.走到这里说明没什么问题了,可以序列化了 // 6.走到这里说明没什么问题了,可以序列化了
byte[] body = new byte[dataLength]; byte[] body = new byte[dataLength];
byteBuf.readBytes(body); in.readBytes(body);
// 将bytes数组转换为我们需要的对象 // 将bytes数组转换为我们需要的对象
Object obj = serializer.deserialize(body, genericClass); Object obj = serializer.deserialize(body, genericClass);
list.add(obj); out.add(obj);
logger.info("successful decode ByteBuf to Object");
} }
} }
} }
package github.javaguide.transport.netty; package github.javaguide.transport.netty.codec;
import github.javaguide.serialize.Serializer; import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
...@@ -7,8 +7,12 @@ import io.netty.handler.codec.MessageToByteEncoder; ...@@ -7,8 +7,12 @@ import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
/** /**
* 自定义编码器。负责处理"出站"消息,将消息格式转换字节数组然后写入到字节数据的容日 ByteBuf 对象中。
* <p>
* 网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 19:43:00 * @createTime 2020年05月25日 19:43:00
*/ */
@AllArgsConstructor @AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> { public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
......
package github.javaguide.transport.netty; package github.javaguide.transport.netty.server;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.serialize.kyro.KryoSerializer; import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.netty.codec.NettyKryoDecoder;
import github.javaguide.transport.netty.codec.NettyKryoEncoder;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
......
package github.javaguide.transport.netty; package github.javaguide.transport.netty.server;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
...@@ -14,18 +14,22 @@ import org.slf4j.Logger; ...@@ -14,18 +14,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* 自定义服务端的 ChannelHandler 来处理客户端发过来的数据
*
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月13日 20:44:00 * @createTime 2020年05月25日 20:44:00
*/ */
public class NettyServerHandler extends ChannelInboundHandlerAdapter { public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class); private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RpcRequestHandler rpcRequestHandler; private static RpcRequestHandler rpcRequestHandler;
private static ServiceRegistry serviceRegistry; private static ServiceRegistry serviceRegistry;
static { static {
rpcRequestHandler=new RpcRequestHandler(); rpcRequestHandler = new RpcRequestHandler();
serviceRegistry = new DefaultServiceRegistry(); serviceRegistry = new DefaultServiceRegistry();
} }
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
try { try {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册