fix:整理协议部分

上级 4cebbfce
...@@ -15,26 +15,36 @@ import lombok.extern.slf4j.Slf4j; ...@@ -15,26 +15,36 @@ import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset; import java.nio.charset.Charset;
/** /**
* 以redis协议 为例 【首先开启redis】 * 以redis协议 为例 【首先开启redis】
* <b>命令</b> :set key value *
* <b>命令内容</b> :set name zhangsan * @author : qinyingjie
* 首先 把整个看成一个数组 * @version : 2.2.0
* <content> * @date : 2023/4/28 10:25
* 协议内容:
* ```
* 3 数组个数
* $3 key命令长度
* set
* $4 key值 内容长度
* name
* $8 value值 内容长度
* zhangsan
* ```
* </content>
*/ */
@Slf4j @Slf4j
public class TestRedisClient { public class TestRedisClient {
/**
* <b>命令</b> :set key value
* <b>命令内容</b> :set name zhangsan
* 首先 把整个看成一个数组
* <content>
* 协议内容:
* ```
* 3 数组长度
* $3 key命令长度
* set
* $4 key值 内容长度
* name
* $8 value值 内容长度
* zhangsan
* ```
* </content>
*
* @param args
*/
public static void main(String[] args) { public static void main(String[] args) {
final byte[] LINE = {'\r', '\n'}; final byte[] LINE = {'\r', '\n'};
final NioEventLoopGroup worker = new NioEventLoopGroup(); final NioEventLoopGroup worker = new NioEventLoopGroup();
...@@ -68,6 +78,7 @@ public class TestRedisClient { ...@@ -68,6 +78,7 @@ public class TestRedisClient {
buf.writeBytes(LINE); buf.writeBytes(LINE);
ctx.writeAndFlush(buf); ctx.writeAndFlush(buf);
} }
// read事件 接受redis的返回结果 // read事件 接受redis的返回结果
@Override @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
......
...@@ -61,8 +61,8 @@ public class MessageCodec extends ByteToMessageCodec<Message> { ...@@ -61,8 +61,8 @@ public class MessageCodec extends ByteToMessageCodec<Message> {
final ObjectInputStream ois = new ObjectInputStream(bis); final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型 // 转成 Message类型
Message message = (Message) ois.readObject(); Message message = (Message) ois.readObject();
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length); log.info("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message); log.info("{}", message);
// 将message给下一个handler使用 // 将message给下一个handler使用
out.add(message); out.add(message);
} }
......
...@@ -15,7 +15,7 @@ import java.util.List; ...@@ -15,7 +15,7 @@ import java.util.List;
/** /**
* 【自定义】消息 编解码 类 【 支持@Sharable 】 * 【自定义】消息 编解码 类 【 支持@Sharable 】
* ########## 父类 MessageToMessageCodec 认为是完整的信息 【所以必须保证上一个处理器时 帧解码器】 ######## * 父类 MessageToMessageCodec 认为是完整的信息 【所以必须保证上一个处理器是 帧解码器】
* 相当于两个handler合二为一,既能入站 也能做出站处理 * 相当于两个handler合二为一,既能入站 也能做出站处理
* <b>魔数 </b>,用来在第一时间判定是否是无效数据包 * <b>魔数 </b>,用来在第一时间判定是否是无效数据包
* <b>版本号 </b>,可以支持协议的升级 * <b>版本号 </b>,可以支持协议的升级
...@@ -25,35 +25,27 @@ import java.util.List; ...@@ -25,35 +25,27 @@ import java.util.List;
* <b>正文长度 </b> * <b>正文长度 </b>
* <b>消息正文 </b> * <b>消息正文 </b>
*/ */
// 写这个类 肯定的认为 上一个处理器 是 帧解码器,所以不用考虑半包黏包问题,直接解码拿数据
@Slf4j @Slf4j
@ChannelHandler.Sharable @ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> { // 注意这里Message是自定义类 public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> { // 注意这里Message是自定义类
@Override @Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception { protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer(); ByteBuf out = ctx.alloc().buffer();
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数 out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本 out.writeByte(1); // 1字节的 版本
out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型 out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】 out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍 out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
// 处理内容 用对象流包装字节数组 并写入 // 处理内容 用对象流包装字节数组 并写入
ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组 ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组
ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装 ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装
oos.writeObject(msg); oos.writeObject(msg);
byte[] bytes = bos.toByteArray(); byte[] bytes = bos.toByteArray();
// 写入内容 长度 // 写入内容 长度
out.writeInt(bytes.length); out.writeInt(bytes.length);
// 写入内容 // 写入内容
out.writeBytes(bytes); out.writeBytes(bytes);
/** /**
* 加入List 方便传递给 下一个Handler * 加入List 方便传递给 下一个Handler
*/ */
...@@ -68,26 +60,19 @@ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message ...@@ -68,26 +60,19 @@ public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message
byte messageType = in.readByte(); byte messageType = in.readByte();
int sequenceId = in.readInt(); int sequenceId = in.readInt();
in.readByte(); in.readByte();
int length = in.readInt(); int length = in.readInt();
final byte[] bytes = new byte[length]; final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length); in.readBytes(bytes, 0, length);
// 处理内容 // 处理内容
final ByteArrayInputStream bis = new ByteArrayInputStream(bytes); final ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
final ObjectInputStream ois = new ObjectInputStream(bis); final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型 // 转成 Message类型
Message message = (Message) ois.readObject(); Message message = (Message) ois.readObject();
log.info("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length); log.info("{}", message);
log.debug("{}", message);
/** /**
* 加入List 方便传递给 下一个Handler * 加入List 方便传递给 下一个Handler
*/ */
out.add(message); out.add(message);
} }
} }
\ No newline at end of file
...@@ -13,16 +13,16 @@ import io.netty.handler.logging.LoggingHandler; ...@@ -13,16 +13,16 @@ import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
/** /**
*
* 聊天--服务端 * 聊天--服务端
*@version : 2.2.0 *
*@author : qinyingjie * @author : qinyingjie
*@date : 2023/4/27 19:51 * @version : 2.2.0
* @date : 2023/4/27 19:51
*/ */
@Slf4j @Slf4j
public class ChatServer { public class ChatServer {
public static final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); public static final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
public static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); public static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.INFO);
public static void main(String[] args) throws InterruptedException { public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup boss = new NioEventLoopGroup(); final NioEventLoopGroup boss = new NioEventLoopGroup();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册