fix:粘包和分包

上级 15d128cf
......@@ -10,6 +10,14 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>netty-03-jinjie</artifactId>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
......
package com.kwan.shuyu.c1;
package com.kwan.shuyu.advance_01_sticky_bag.c1;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
......
package com.kwan.shuyu.c1;
package com.kwan.shuyu.advance_01_sticky_bag.c1;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
......
package com.kwan.shuyu.c2;
package com.kwan.shuyu.advance_01_sticky_bag.c2;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
......
package com.kwan.shuyu.c2;
package com.kwan.shuyu.advance_01_sticky_bag.c2;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
......
package com.kwan.shuyu.c3;
package com.kwan.shuyu.advance_01_sticky_bag.c3;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
......
package com.kwan.shuyu.c3;
package com.kwan.shuyu.advance_01_sticky_bag.c3;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
......
package com.kwan.shuyu.c4;
package com.kwan.shuyu.advance_01_sticky_bag.c4;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
......
package com.kwan.shuyu.c4;
package com.kwan.shuyu.advance_01_sticky_bag.c4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
......
package com.kwan.shuyu.advance_01_sticky_bag.c5;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Random;
/**
* 客户端
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/27 18:40
*/
@Slf4j
public class Client {
public static void main(String[] args) {
send();
System.out.println("send......finish......");
}
private static void send() {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
final ByteBuf buf = ctx.alloc().buffer();
final Random r = new Random();
char c = 'a';
for (int i = 0; i < 5; i++) {
final StringBuilder tmpStr = getStr(c, r.nextInt(256) + 1); //随机生成 1-257 长度的 字符串
c++;
buf.writeBytes(tmpStr.toString().getBytes());
}
ctx.writeAndFlush(buf);
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
public static StringBuilder getStr(char c, int len) {
final StringBuilder s = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
s.append(c);
}
s.append('\n');
return s;
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_01_sticky_bag.c5;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
*
* 固定分隔符
*@version : 2.2.0
*@author : qinyingjie
*@date : 2023/4/27 18:59
*/
@Slf4j
public class Server {
void start() {
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
final ByteBuf buf = Unpooled.wrappedBuffer(new byte[]{'\r', '\n'});// 客户端分隔符必须 "\r\n"
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); // 超出1024报错
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
final ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_01_sticky_bag.c6;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 客户端
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/27 19:08
*/
public class Client {
static final Logger log = LoggerFactory.getLogger(Client.class);
public static void main(String[] args) {
send();
System.out.println("send......finish......");
}
private static void send() {
final NioEventLoopGroup worker = new NioEventLoopGroup();
try {
final Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// channel连接建立好之后 出发 channelActive() 时间
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
final ByteBuf buf = ctx.alloc().buffer();
send(buf, "hello, world");
send(buf, "hi");
ctx.writeAndFlush(buf);
}
});
}
});
final ChannelFuture channelFuture = bootstrap.connect("localhost", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("Client error", e);
} finally {
worker.shutdownGracefully();
}
}
private static void send(ByteBuf buf, String s) {
byte[] bytes = s.getBytes(); // 实际内容
final int length = bytes.length; // 实际内容长度
buf.writeInt(length); // 指定长度 和 存储模式为: 大端模式 【writeInt本身长度 4字节】 【服务端 lengthFieldLength设为4】
buf.writeBytes(new byte[]{'1', '0', '0'}); // 版本号 100, 【服务端 lengthAdjustment 设为 3】 【注意:如果设0 会报错(除非主体内容最后四位都是0)】
buf.writeBytes(bytes);
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_01_sticky_bag.c6;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
/**
* 服务端 基于长度字段的 帧解码器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/27 19:08
* LengthFieldBasedFrameDecoder(
* int maxFrameLength, 限制最大长度
* int lengthFieldOffset, 长度字段 的 偏移量
* int lengthFieldLength, 长度字段 本身长度
* int lengthAdjustment, 长度字段 为基准,跳过几个字节 才是内容
* int initialBytesToStrip 从头剥离 几个字节,解析后将不出现
* )
*/
@Slf4j
public class Server {
void start() {
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
try {
final ServerBootstrap bs = new ServerBootstrap();
bs.channel(NioServerSocketChannel.class);
bs.group(boss, worker);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 解码器 【以下参数 要和 客户端约定好 】
ch.pipeline().addLast(
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 3, 7)
);
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("有信息进来了。。。。");
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
});
final ChannelFuture channelFuture = bs.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new Server().start();
}
}
\ No newline at end of file
package com.kwan.shuyu.advance_01_sticky_bag.c6;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.jupiter.api.Test;
/**
* ################################################
* ###### 基于长度字段的 帧解码器 ########
* ################################################
* <p>
* #################### 案例1 ###################
* 注意:内容读完后,再读到字节就当成 Length 如果不规范会报错
* <p>
* * <pre>
* * <b>lengthFieldOffset</b> = <b>0</b>
* * <b>lengthFieldLength</b> = <b>2</b> (000c 两个字节)
* * lengthAdjustment = 0
* * initialBytesToStrip = 0 (= do not strip header)
* *
* * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
* * +--------+----------------+ +--------+----------------+
* * | Length | Actual Content |----->| Length | Actual Content |
* * | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
* * +--------+----------------+ +--------+----------------+
* 服务器:获取内容长度,从0(lengthFieldOffset=0)开始,先读两(lengthFieldLength=2)个字节,000c = 12,知道内容长度是12
* 再读 12个字节
* * </pre>
* LengthFieldBasedFrameDecoder(
* int maxFrameLength, 限制最大长度,超过他没找到分隔符报错
* int lengthFieldOffset, 长度字段 偏移量
* int lengthFieldLength, 长度字段 本身长度
* int lengthAdjustment, 长度字段 为基准,跳过几个字节 才是内容
* int initialBytesToStrip 从头玻璃 几个字节,解析后将不出现
* )
* <p>
* #################### 案例2 【从头剥离几个字节】 ############
* * <pre>
* * lengthFieldOffset = 0
* * lengthFieldLength = 2
* * lengthAdjustment = 0
* * <b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field) 从头剥离几个字节,剩下的都是内容
* *
* * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
* * +--------+----------------+ +----------------+
* * | Length | Actual Content |----->| Actual Content |
* * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
* * +--------+----------------+ +----------------+
* * </pre>
* <p>
* #################### 案例3 【带消息头】 ###################
* <h3>3字节长度字段位于5字节头的末尾,不带头</h3>
* <p>
* * <pre>
* * <b>lengthFieldOffset</b> = <b>2</b> (= the length of Header 1) 长度字段偏移量 长度= 2 = 头部长度
* * <b>lengthFieldLength</b> = <b>3</b> 指定长度字段本身 为 3字节
* * lengthAdjustment = 0
* * initialBytesToStrip = 0 不剥离字节长度,解析过后还是17字节
* *
* * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* * +-- 2字节 --+-- 3字节 --+---- 12字节 -----+ +----------+----------+----------------+
* * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
* * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
* * +----------+----------+----------------+ +----------+----------+----------------+
* * </pre>
* <p>
* #################### 案例4 【指定长度和长度调整(头部长度)跳过头部内容】 ###################
* <h3>3字节长度字段位于5字节头的开头,不带头</h3>
* <p>
* * <pre>
* * lengthFieldOffset = 0
* * lengthFieldLength = 3
* * <b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1)
* * initialBytesToStrip = 0
* *
* * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
* * +-- 3字节 --+-- 2字节 --+---- 12字节 ----+ +----------+----------+---------------+
* * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
* * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
* * +----------+----------+----------------+ +----------+----------+----------------+
* Length = 12 表示内容长度
* lengthAdjustment = 2 指跳过两个字节长度的头部内容后 ,有 Length = 12 字节长度的主体消息内容
* * </pre>
* <p>
* #################### 案例5 【偏移1后的长度字段 调整1字节后 才是内容字段 在丢掉从头开始数3字节】 ########
* <h3>在4字节头部中间偏移1的2字节长度字段,带头头字段和长度字段</h3>
* <p>
* * <pre>
* * lengthFieldOffset = 1 (= the length of HDR1)
* * lengthFieldLength = 2
* * <b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2)
* * <b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
* *
* * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
* * + 1字节 +- 2字节 -+ 1字节 +--- 12字节 ----+ +------+----------------+
* * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
* * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
* * +------+--------+------+----------------+ +------+----------------+
* lengthFieldOffset = 1 :HDR1长度是1, 偏移1后才是长度字段Length
* lengthFieldLength = 2 : 长度字段Length长2 ,内容长度=12
* lengthAdjustment = 1 : 长度字节Length调整1个字节,过后才是内容字段
* initialBytesToStrip = 3 : 要从头开始 剥离的字节长度,头3个字节不想要了
* * </pre>
*/
public class TestLengthFieldDecoder {
public static void main(String[] args) {
//当成 服务器端
final EmbeddedChannel ch = new EmbeddedChannel(
// 注意:解码器 放debug上面
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 3, 0),
new LoggingHandler(LogLevel.DEBUG)
);
//客户端
// 指定:长度字段-4字节;
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
send(buf, "hello, world");
send(buf, "hi");
// 将消息 写入 Channel
ch.writeInbound(buf);
}
private static void send(ByteBuf buf, String s) {
byte[] bytes = s.getBytes(); // 实际内容
final int length = bytes.length; // 实际内容长度
buf.writeInt(length); // 指定长度 和 存储模式为: 大端模式 【writeInt本身长度 4字节】 【服务端 lengthFieldLength设为4】
buf.writeBytes(new byte[]{'1', '0', '0'}); // 版本号 100, 【服务端 lengthAdjustment 设为 3】 【注意:如果设0 会报错(除非主体内容最后四位都是0)】
buf.writeBytes(bytes);
}
/**
* 高低位测试
*/
@Test
public void test1() {
int num = 0x12345678;
// "%d\n", num
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册