fix:自定义传输协议

上级 cf90766b
package com.kwan.shuyu.advance_02_protocol.custom;
import com.kwan.shuyu.advance_02_protocol.custom.message.LoginRequestMessage;
import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodec;
import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodecSharable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LoggingHandler;
/**
* netty做了个注解标记 @Sharable 可以被共享,可以在多线程下安全使用 【一般解码器类 是不能被共享的】
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/27 19:52
*/
public class TestMessageCodec {
/**
* LengthFieldBasedFrameDecoder 不能抽离
* 1. 两个 Eventloop 线程, 使用同一个对象,会产生线程安全的共享资源
* 2. 抽离出来的 LengthFieldBasedFrameDecoder对象 主要记录了多次消息之间的状态就是线程不安全的
* 【半包黏包那样保存上一个信息】 ,就不能在多个Eventloop下使用同一个Handler
*/
public static void main(String[] args) throws Exception {
/**
* 这里 注意顺序
* 帧解码器 处理完,【是完整信息 才会向下一个 handler传递】
* 抽离出来的 LOGGING_HANDLER 是没有状态信息的Handler, 不会出现这样的问题,来多少数据 就打印多少数据
*/
final LoggingHandler LOGGIN_HANDLER = new LoggingHandler();
final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
final SimpleChannelInboundHandler<LoginRequestMessage> channelInboundHandler = new SimpleChannelInboundHandler() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// 这里的 Object msg 已经 【被 MessageCodecSharable 解码成了 Message类型了】
LoginRequestMessage loginRequestMessage = (LoginRequestMessage) msg;
System.out.println(loginRequestMessage.getNickname() + "==================" + loginRequestMessage.getPassword());
}
};
final EmbeddedChannel channel = new EmbeddedChannel(
LOGGIN_HANDLER, // 【移动到流水线的最上方 可以 打印出 半包情况】
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
MESSAGE_CODEC,
channelInboundHandler
);
LoginRequestMessage message = new LoginRequestMessage("张三", "123456", "zs");
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); // 新建一个buf
new MessageCodec().encode(null, message, buf); // 【编码 入站】
// 半包测试
final ByteBuf s1 = buf.slice(0, 100);
final ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
// 【引用计数 + 1】 才能 调用两次 writeInbound() 保证不会释放内容
s1.retain(); // retain + 1 = 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2); // release 0
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_02_protocol.custom.message;
import lombok.Data;
import lombok.ToString;
/**
* 消息的 === 基类
*/
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
private String nickname;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password, String nickname) {
this.username = username;
this.password = password;
this.nickname = nickname;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
package com.kwan.shuyu.advance_02_protocol.custom.message;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
/* messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);*/
}
}
package com.kwan.shuyu.advance_02_protocol.custom.protocol;
import com.kwan.shuyu.advance_02_protocol.custom.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
* 【自定义】消息 编解码 类 【 不支持@Sharable 】
* 不支持 @Sharable注解 因为进来的消息 可能会是不完整的
* 相当于两个handler合二为一,既能入站 也能做出站处理
* <b>魔数 </b>,用来在第一时间判定是否是无效数据包
* <b>版本号 </b>,可以支持协议的升级
* <b>序列化算法</b>,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
* <b>指令类型 </b>,是登录、注册、单聊、群聊... 跟业务相关
* <b>请求序号 </b>,为了双工通信,提供异步能力
* <b>正文长度 </b>
* <b>消息正文 </b>
*/
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本
out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
// 处理内容 用对象流包装字节数组 并写入
ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组
ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 写入内容 长度
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt(); // 大端4字节的 魔数
byte version = in.readByte(); // 版本
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 处理内容
final ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型
Message message = (Message) ois.readObject();
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
// 将message给下一个handler使用
out.add(message);
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_02_protocol.custom.protocol;
import com.kwan.shuyu.advance_02_protocol.custom.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
/**
* 【自定义】消息 编解码 类 【 支持@Sharable 】
* ########## 父类 MessageToMessageCodec 认为是完整的信息 【所以必须保证上一个处理器时 帧解码器】 ########
* 相当于两个handler合二为一,既能入站 也能做出站处理
* <b>魔数 </b>,用来在第一时间判定是否是无效数据包
* <b>版本号 </b>,可以支持协议的升级
* <b>序列化算法</b>,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
* <b>指令类型 </b>,是登录、注册、单聊、群聊... 跟业务相关
* <b>请求序号 </b>,为了双工通信,提供异步能力
* <b>正文长度 </b>
* <b>消息正文 </b>
*/
// 写这个类 肯定的认为 上一个处理器 是 帧解码器,所以不用考虑半包黏包问题,直接解码拿数据
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> { // 注意这里Message是自定义类
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本
out.writeByte(0); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
// 处理内容 用对象流包装字节数组 并写入
ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组
ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 写入内容 长度
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
/**
* 加入List 方便传递给 下一个Handler
*/
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt(); // 大端4字节的 魔数
byte version = in.readByte(); // 版本
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 处理内容
final ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型
Message message = (Message) ois.readObject();
log.debug("{},{},{},{},{},{}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
/**
* 加入List 方便传递给 下一个Handler
*/
out.add(message);
}
}
package com.kwan.shuyu.advance_02_protocol.custom.server;
import com.kwan.shuyu.advance_02_protocol.custom.protocol.MessageCodecSharable;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
*
* 聊天--服务端
*@version : 2.2.0
*@author : qinyingjie
*@date : 2023/4/27 19:51
*/
@Slf4j
public class ChatServer {
public static final MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
public static final LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try {
final ServerBootstrap bs = new ServerBootstrap();
bs.channel(NioServerSocketChannel.class);
bs.group(boss, worker);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
final ChannelFuture channelFuture = bs.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册