diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/TestMessageCodec.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/TestMessageCodec.java new file mode 100644 index 0000000000000000000000000000000000000000..b4b4c0c2fc3f769fb1d7a75e3b82ab226666adb7 --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/TestMessageCodec.java @@ -0,0 +1,62 @@ +package com.kwan.shuyu.advance_02_protocol.custom; + +import com.kwan.shuyu.advance_02_protocol.custom.message.LoginRequestMessage; +import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodec; +import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodecSharable; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.logging.LoggingHandler; + +/** + * netty做了个注解标记 @Sharable 可以被共享,可以在多线程下安全使用 【一般解码器类 是不能被共享的】 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/4/27 19:52 + */ +public class TestMessageCodec { + /** + * LengthFieldBasedFrameDecoder 不能抽离 + * 1. 两个 Eventloop 线程, 使用同一个对象,会产生线程安全的共享资源 + * 2. 抽离出来的 LengthFieldBasedFrameDecoder对象 主要记录了多次消息之间的状态就是线程不安全的 + * 【半包黏包那样保存上一个信息】 ,就不能在多个Eventloop下使用同一个Handler + */ + public static void main(String[] args) throws Exception { + /** + * 这里 注意顺序 + * 帧解码器 处理完,【是完整信息 才会向下一个 handler传递】 + * 抽离出来的 LOGGING_HANDLER 是没有状态信息的Handler, 不会出现这样的问题,来多少数据 就打印多少数据 + */ + final LoggingHandler LOGGIN_HANDLER = new LoggingHandler(); + final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); + final SimpleChannelInboundHandler channelInboundHandler = new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + // 这里的 Object msg 已经 【被 MessageCodecSharable 解码成了 Message类型了】 + LoginRequestMessage loginRequestMessage = (LoginRequestMessage) msg; + System.out.println(loginRequestMessage.getNickname() + "==================" + loginRequestMessage.getPassword()); + } + }; + final EmbeddedChannel channel = new EmbeddedChannel( + LOGGIN_HANDLER, // 【移动到流水线的最上方 可以 打印出 半包情况】 + new LengthFieldBasedFrameDecoder( + 1024, 12, 4, 0, 0), + MESSAGE_CODEC, + channelInboundHandler + ); + LoginRequestMessage message = new LoginRequestMessage("张三", "123456", "zs"); + final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); // 新建一个buf + new MessageCodec().encode(null, message, buf); // 【编码 入站】 + // 半包测试 + final ByteBuf s1 = buf.slice(0, 100); + final ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100); + // 【引用计数 + 1】 才能 调用两次 writeInbound() 保证不会释放内容 + s1.retain(); // retain + 1 = 2 + channel.writeInbound(s1); // release 1 + channel.writeInbound(s2); // release 0 + } +} \ No newline at end of file diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/LoginRequestMessage.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/LoginRequestMessage.java new file mode 100644 index 0000000000000000000000000000000000000000..e659c8bd59da7dcb2d3f6cd6e98b8bcd7c662605 --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/LoginRequestMessage.java @@ -0,0 +1,32 @@ +package com.kwan.shuyu.advance_02_protocol.custom.message; + +import lombok.Data; +import lombok.ToString; + +/** + * 消息的 === 基类 + */ +@Data +@ToString(callSuper = true) +public class LoginRequestMessage extends Message { + private String username; + private String password; + private String nickname; + + public LoginRequestMessage() { + } + + public LoginRequestMessage(String username, String password, String nickname) { + this.username = username; + this.password = password; + this.nickname = nickname; + } + + @Override + public int getMessageType() { + return LoginRequestMessage; + } + +} + + diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/Message.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/Message.java new file mode 100644 index 0000000000000000000000000000000000000000..19a96c158b857a6bbda080f1139dc6d0c401940f --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/message/Message.java @@ -0,0 +1,54 @@ +package com.kwan.shuyu.advance_02_protocol.custom.message; + +import lombok.Data; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Data +public abstract class Message implements Serializable { + + public static Class getMessageClass(int messageType) { + return messageClasses.get(messageType); + } + + private int sequenceId; + + private int messageType; + + public abstract int getMessageType(); + + public static final int LoginRequestMessage = 0; + public static final int LoginResponseMessage = 1; + public static final int ChatRequestMessage = 2; + public static final int ChatResponseMessage = 3; + public static final int GroupCreateRequestMessage = 4; + public static final int GroupCreateResponseMessage = 5; + public static final int GroupJoinRequestMessage = 6; + public static final int GroupJoinResponseMessage = 7; + public static final int GroupQuitRequestMessage = 8; + public static final int GroupQuitResponseMessage = 9; + public static final int GroupChatRequestMessage = 10; + public static final int GroupChatResponseMessage = 11; + public static final int GroupMembersRequestMessage = 12; + public static final int GroupMembersResponseMessage = 13; + private static final Map> messageClasses = new HashMap<>(); + + static { + messageClasses.put(LoginRequestMessage, LoginRequestMessage.class); +/* messageClasses.put(LoginResponseMessage, LoginResponseMessage.class); + messageClasses.put(ChatRequestMessage, ChatRequestMessage.class); + messageClasses.put(ChatResponseMessage, ChatResponseMessage.class); + messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class); + messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class); + messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class); + messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class); + messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class); + messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class); + messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class); + messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class); + messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class); + messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);*/ + } +} diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodec.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodec.java new file mode 100644 index 0000000000000000000000000000000000000000..f1dd3bcacb3c14e56919a888f07fddd976b2deab --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodec.java @@ -0,0 +1,69 @@ +package com.kwan.shuyu.advance_02_protocol.custom.protocol; + +import com.kwan.shuyu.advance_02_protocol.custom.message.Message; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageCodec; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; + +/** + * 【自定义】消息 编解码 类 【 不支持@Sharable 】 + * 不支持 @Sharable注解 因为进来的消息 可能会是不完整的 + * 相当于两个handler合二为一,既能入站 也能做出站处理 + * 魔数 ,用来在第一时间判定是否是无效数据包 + * 版本号 ,可以支持协议的升级 + * 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk + * 指令类型 ,是登录、注册、单聊、群聊... 跟业务相关 + * 请求序号 ,为了双工通信,提供异步能力 + * 正文长度 + * 消息正文 + */ +@Slf4j +public class MessageCodec extends ByteToMessageCodec { + @Override + public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception { + out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数 + out.writeByte(1); // 1字节的 版本 + out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json + out.writeByte(msg.getMessageType()); // 1字节的 指令类型 + out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 + out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍 + // 处理内容 用对象流包装字节数组 并写入 + ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组 + ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装 + oos.writeObject(msg); + byte[] bytes = bos.toByteArray(); + // 写入内容 长度 + out.writeInt(bytes.length); + // 写入内容 + out.writeBytes(bytes); + } + + @Override + public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + int magicNum = in.readInt(); // 大端4字节的 魔数 + byte version = in.readByte(); // 版本 + byte serializerType = in.readByte(); + byte messageType = in.readByte(); + int sequenceId = in.readInt(); + in.readByte(); + int length = in.readInt(); + final byte[] bytes = new byte[length]; + in.readBytes(bytes, 0, length); + // 处理内容 + final ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + final ObjectInputStream ois = new ObjectInputStream(bis); + // 转成 Message类型 + Message message = (Message) ois.readObject(); + log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length); + log.debug("{}", message); + // 将message给下一个handler使用 + out.add(message); + } +} \ No newline at end of file diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodecSharable.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodecSharable.java new file mode 100644 index 0000000000000000000000000000000000000000..2ee543318c2fa7f710fdd79e20af14e30c471add --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/protocol/MessageCodecSharable.java @@ -0,0 +1,93 @@ +package com.kwan.shuyu.advance_02_protocol.custom.protocol; + +import com.kwan.shuyu.advance_02_protocol.custom.message.Message; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageCodec; +import lombok.extern.slf4j.Slf4j; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.List; + +/** + * 【自定义】消息 编解码 类 【 支持@Sharable 】 + * ########## 父类 MessageToMessageCodec 认为是完整的信息 【所以必须保证上一个处理器时 帧解码器】 ######## + * 相当于两个handler合二为一,既能入站 也能做出站处理 + * 魔数 ,用来在第一时间判定是否是无效数据包 + * 版本号 ,可以支持协议的升级 + * 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk + * 指令类型 ,是登录、注册、单聊、群聊... 跟业务相关 + * 请求序号 ,为了双工通信,提供异步能力 + * 正文长度 + * 消息正文 + */ +// 写这个类 肯定的认为 上一个处理器 是 帧解码器,所以不用考虑半包黏包问题,直接解码拿数据 +@Slf4j +@ChannelHandler.Sharable +public class MessageCodecSharable extends MessageToMessageCodec { // 注意这里Message是自定义类 + + @Override + protected void encode(ChannelHandlerContext ctx, Message msg, List outList) throws Exception { + + ByteBuf out = ctx.alloc().buffer(); + + out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数 + out.writeByte(1); // 1字节的 版本 + out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json + out.writeByte(msg.getMessageType()); // 1字节的 指令类型 + out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】 + out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍 + + // 处理内容 用对象流包装字节数组 并写入 + ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组 + ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装 + oos.writeObject(msg); + + byte[] bytes = bos.toByteArray(); + + // 写入内容 长度 + out.writeInt(bytes.length); + // 写入内容 + out.writeBytes(bytes); + + /** + * 加入List 方便传递给 下一个Handler + */ + outList.add(out); + } + + @Override + protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { + int magicNum = in.readInt(); // 大端4字节的 魔数 + byte version = in.readByte(); // 版本 + byte serializerType = in.readByte(); + byte messageType = in.readByte(); + int sequenceId = in.readInt(); + in.readByte(); + + int length = in.readInt(); + final byte[] bytes = new byte[length]; + in.readBytes(bytes, 0, length); + + // 处理内容 + final ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + final ObjectInputStream ois = new ObjectInputStream(bis); + + // 转成 Message类型 + Message message = (Message) ois.readObject(); + + log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length); + log.debug("{}", message); + + + /** + * 加入List 方便传递给 下一个Handler + */ + out.add(message); + + } +} diff --git a/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/server/ChatServer.java b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/server/ChatServer.java new file mode 100644 index 0000000000000000000000000000000000000000..9b6f88519d33af200efc48e315fd560978401648 --- /dev/null +++ b/netty-03-jinjie/src/main/java/com/kwan/shuyu/advance_02_protocol/custom/server/ChatServer.java @@ -0,0 +1,51 @@ +package com.kwan.shuyu.advance_02_protocol.custom.server; + +import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodecSharable; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.extern.slf4j.Slf4j; + +/** + * + * 聊天--服务端 + *@version : 2.2.0 + *@author : qinyingjie + *@date : 2023/4/27 19:51 + */ +@Slf4j +public class ChatServer { + public static final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); + public static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); + + public static void main(String[] args) throws InterruptedException { + final NioEventLoopGroup boss = new NioEventLoopGroup(); + final NioEventLoopGroup worker = new NioEventLoopGroup(); + try { + final ServerBootstrap bs = new ServerBootstrap(); + bs.channel(NioServerSocketChannel.class); + bs.group(boss, worker); + bs.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0)); + ch.pipeline().addLast(LOGGING_HANDLER); + ch.pipeline().addLast(MESSAGE_CODEC); + } + }); + final ChannelFuture channelFuture = bs.bind(8080).sync(); + channelFuture.channel().closeFuture().sync(); + } catch (InterruptedException e) { + log.error("server error", e); + } finally { + boss.shutdownGracefully(); + worker.shutdownGracefully(); + } + } +} \ No newline at end of file