fix:聊天业务

上级 f6afa2e2
package com.kwan.shuyu.server;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import com.kwan.shuyu.server.handler.*;
......@@ -19,7 +18,6 @@ import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 聊天--服务端
*
......@@ -29,10 +27,7 @@ import lombok.extern.slf4j.Slf4j;
*/
@Slf4j
public class ChatServer {
public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
// 局部变量
......@@ -43,14 +38,11 @@ public class ChatServer {
final GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();//--创建群聊---处理器
final GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler(); //--群聊---处理器
final QuitHandler QUIT_HANDLER = new QuitHandler(); //--断开连接---处理器
try {
final ServerBootstrap bs = new ServerBootstrap();
bs.channel(NioServerSocketChannel.class);
bs.group(boss, worker);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 用来判断 是不是读 空闲时间过长,或写空闲时间过长 (读,写,读写空闲时间限制) 0表示不关心
......@@ -75,9 +67,7 @@ public class ChatServer {
}
}
});
ch.pipeline().addLast(QUIT_HANDLER); //--断开连接---处理器
ch.pipeline().addLast(new ProcotolFrameDecoder()); // 帧解码器 【与自定义编解码器 MessageCodecSharable一起配置参数】
ch.pipeline().addLast(LOGGING_HANDLER); // 日志
ch.pipeline().addLast(MESSAGE_CODEC); // 出站入站的 自定义编解码器 【 解析消息类型 】
......@@ -86,23 +76,15 @@ public class ChatServer {
ch.pipeline().addLast(CHAT_HANDLER); //--单聊---处理器
ch.pipeline().addLast(GROUP_CREATE_HANDLER); //--创建群聊---处理器
ch.pipeline().addLast(GROUP_CHAT_HANDLER); //--群聊---处理器
}
});
ChannelFuture channelFuture = bs.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
}
\ No newline at end of file
package com.kwan.shuyu.test;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class OnlyTest {
public static void main(String[] args) {
/*
// 【例题 1】
System.out.println(Config.getServerPort());
final int ordinal = Config.getMySerializerAlgorithm().ordinal();
System.out.println("ordinal = " + ordinal);
String s = "aaaaaaaaaa";
final byte[] bytes = Config.getMySerializerAlgorithm().serializ(s);
System.out.println(Arrays.toString(bytes));
*/
// 【例题 2】
// System.out.println(MySerializer.Algorithm.Json.ordinal());
// 【例题 3】 测试 集合的 computeIfPresent 如果存在key则计算
final Map<String, Set<String>> map = new HashMap<>();
final HashSet<String> set1 = new HashSet<>();
set1.add("张三");
set1.add("李四");
set1.add("王五");
map.put("ql1", set1);
final HashSet<String> set2 = new HashSet<>();
set2.add("zhangsan");
set2.add("lisi");
set2.add("wangwu");
map.put("ql1", set1);
map.put("ql2", set2);
final boolean re = map.get("ql1").remove("abc");
System.out.println(re); // true or false
//map.get("ql6").add("赵六");
map.computeIfPresent("ql6", (k, v) -> {
v.add("赵六");
return v;
});
System.out.println(map.toString());
}
}
\ No newline at end of file
package com.kwan.shuyu.test;
import com.kwan.shuyu.config.Config;
import com.kwan.shuyu.message.LoginRequestMessage;
import com.kwan.shuyu.message.Message;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class TestAlgorithm {
public static void main(String[] args) {
// 局部变量
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
final EmbeddedChannel embeddedChannel = new EmbeddedChannel(
LOGGING_HANDLER,
// new ProcotolFrameDecoder(), // 帧解码器 【与自定义编解码器 MessageCodecSharable一起配置参数】
MESSAGE_CODEC,
LOGGING_HANDLER
);
LoginRequestMessage message = new LoginRequestMessage("张三", "123456");
/* ########################################################
####### 出站测试 【出站 自动编码】 encode ##########
########################################################*/
// embeddedChannel.writeOutbound(message);
/* #########################################################
####### 入站测试 【入站 自动解码】 decode ############
#########################################################*/
final ByteBuf buf = messageToByteBuf(message);
embeddedChannel.writeInbound(buf);
}
private static ByteBuf messageToByteBuf(Message msg) {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本
out.writeByte(Config.getMySerializerAlgorithm().ordinal()); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
final byte[] bytes = Config.getMySerializerAlgorithm().serializ(msg);
// 写入内容 长度
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
return out;
}
}
\ No newline at end of file
package com.kwan.shuyu.test;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
/**
* 断点方法 控制全连接队列大小 (因为netty处理能力很强,只有来不及处理了,才放入全连接队列)
* 在 NioEventLoop.java 文件里 的 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 打断点
* <b>源码里解释:</b>
* 1. SelectionKey.OP_ACCEPT 事件 表示连接建立了, 表示三次握手成功了
* 2. unsafe.read() 一执行 就表示:服务端执行了accept() 了,进一步:在连接队列里就可以把这个连接取出来了
* 3. 这里打个断点,相当于,故意把连接信息 不取出来,放在队列里
* <b>方法:</b>
* 1. 服务端debug运行
* 2. 客户端直接运行连接
* <b>查看 backlog初始值</b>
* 1. 从 java.nio.channels.ServerSocketChannel 里(ctrl+F12)找 bind(xxx, backlog) 方法
* 2. 鼠标右键 对着 find(xxx 点击 [Find Usages] 找到netty里使用该方法的 方法
* 3. 找到 config.getBacklog()); =====> config; =====> private final ServerSocketChannelConfig config;
* 4. 找到 ServerSocketChannelConfig 的 int getBacklog(); =====> class DefaultServerSocketChannelConfig 里
* 5. 找到 private volatile int backlog = NetUtil.SOMAXCONN;
* 6. 最后发现 NetUtil 里 253行定义了默认数据
*/
public class TestBacklogServer {
public static void main(String[] args) {
final ServerBootstrap bs = new ServerBootstrap();
/**
* 调整 全连接队列大小 【注意这里和系统的取最小值为准,所以系统也要配】
*/
bs.option(ChannelOption.SO_BACKLOG, 5);// 超出报错:java.net.ConnectException: Connection refused: no further information
bs.group(new NioEventLoopGroup());
bs.channel(NioServerSocketChannel.class);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
}
}).bind(8080);
}
}
\ No newline at end of file
package com.kwan.shuyu.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestConnectionTimeout {
/*
配置参数:
1. 客户端通过 .option() 方法配置参数 给 SocketChannel 配置餐年数 (因为对于客户端来说就一个Socket)
2. 服务端
new ServerBootstrap().option() // 给 ServerSocketChannel 配置参数
new ServerBootstrap().childHandler() // 给 SocketChannel 配置参数
*/
public static void main(String[] args) {
final NioEventLoopGroup group = new NioEventLoopGroup();
try {
final Bootstrap bs = new Bootstrap()
.group(group)
// 300毫秒超时限制 (注意:最多限制2秒,否则报更底层的java.net.ConnectException: Connection refused: no further information)
/*
debug 发现,这里是创建了 定时任务,一秒后触发,如果连接成功则取消 使用promise做主次线程之间通信
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
如果超时 只执行到 future.sync() 然后 抛异常
*/
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bs.connect("localhost", 8080);
System.out.println("111111111111111111111111111111111111111111111111111111111111");
final ChannelFuture channelFuture = future.sync();
System.out.println("222222222222222222222222222222222222222222222222222222222222");
channelFuture.channel().close().sync();
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}
\ No newline at end of file
package com.kwan.shuyu.utils;
import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer;
import static io.netty.util.internal.MathUtil.isOutOfBounds;
import static io.netty.util.internal.StringUtil.NEWLINE;
/**
* ByteBuffer工具类
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:48
*/
public class ByteBufferUtil {
private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16];
private static final String[] HEXDUMP_ROWPREFIXES = new String[65536 >>> 4];
private static final String[] BYTE2HEX = new String[256];
private static final String[] BYTEPADDING = new String[16];
static {
final char[] DIGITS = "0123456789abcdef".toCharArray();
for (int i = 0; i < 256; i++) {
HEXDUMP_TABLE[i << 1] = DIGITS[i >>> 4 & 0x0F];
HEXDUMP_TABLE[(i << 1) + 1] = DIGITS[i & 0x0F];
}
int i;
// Generate the lookup table for hex dump paddings
for (i = 0; i < HEXPADDING.length; i++) {
int padding = HEXPADDING.length - i;
StringBuilder buf = new StringBuilder(padding * 3);
for (int j = 0; j < padding; j++) {
buf.append(" ");
}
HEXPADDING[i] = buf.toString();
}
// Generate the lookup table for the start-offset header in each row (up to 64KiB).
for (i = 0; i < HEXDUMP_ROWPREFIXES.length; i++) {
StringBuilder buf = new StringBuilder(12);
buf.append(NEWLINE);
buf.append(Long.toHexString(i << 4 & 0xFFFFFFFFL | 0x100000000L));
buf.setCharAt(buf.length() - 9, '|');
buf.append('|');
HEXDUMP_ROWPREFIXES[i] = buf.toString();
}
// Generate the lookup table for byte-to-hex-dump conversion
for (i = 0; i < BYTE2HEX.length; i++) {
BYTE2HEX[i] = ' ' + StringUtil.byteToHexStringPadded(i);
}
// Generate the lookup table for byte dump paddings
for (i = 0; i < BYTEPADDING.length; i++) {
int padding = BYTEPADDING.length - i;
StringBuilder buf = new StringBuilder(padding);
for (int j = 0; j < padding; j++) {
buf.append(' ');
}
BYTEPADDING[i] = buf.toString();
}
// Generate the lookup table for byte-to-char conversion
for (i = 0; i < BYTE2CHAR.length; i++) {
if (i <= 0x1f || i >= 0x7f) {
BYTE2CHAR[i] = '.';
} else {
BYTE2CHAR[i] = (char) i;
}
}
}
/**
* 打印所有内容
*
* @param buffer
*/
public static void debugAll(ByteBuffer buffer) {
int oldlimit = buffer.limit();
buffer.limit(buffer.capacity());
StringBuilder origin = new StringBuilder(256);
appendPrettyHexDump(origin, buffer, 0, buffer.capacity());
System.out.println("+--------+-------------------- all ------------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), oldlimit);
System.out.println(origin);
buffer.limit(oldlimit);
}
/**
* 打印可读取内容
*
* @param buffer
*/
public static void debugRead(ByteBuffer buffer) {
StringBuilder builder = new StringBuilder(256);
appendPrettyHexDump(builder, buffer, buffer.position(), buffer.limit() - buffer.position());
System.out.println("+--------+-------------------- read -----------------------+----------------+");
System.out.printf("position: [%d], limit: [%d]\n", buffer.position(), buffer.limit());
System.out.println(builder);
}
private static void appendPrettyHexDump(StringBuilder dump, ByteBuffer buf, int offset, int length) {
if (isOutOfBounds(offset, length, buf.capacity())) {
throw new IndexOutOfBoundsException(
"expected: " + "0 <= offset(" + offset + ") <= offset + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
}
if (length == 0) {
return;
}
dump.append(
" +-------------------------------------------------+" +
NEWLINE + " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |" +
NEWLINE + "+--------+-------------------------------------------------+----------------+");
final int startIndex = offset;
final int fullRows = length >>> 4;
final int remainder = length & 0xF;
// Dump the rows which have 16 bytes.
for (int row = 0; row < fullRows; row++) {
int rowStartIndex = (row << 4) + startIndex;
// Per-row prefix.
appendHexDumpRowPrefix(dump, row, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + 16;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(" |");
// ASCII dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append('|');
}
// Dump the last row which has less than 16 bytes.
if (remainder != 0) {
int rowStartIndex = (fullRows << 4) + startIndex;
appendHexDumpRowPrefix(dump, fullRows, rowStartIndex);
// Hex dump
int rowEndIndex = rowStartIndex + remainder;
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2HEX[getUnsignedByte(buf, j)]);
}
dump.append(HEXPADDING[remainder]);
dump.append(" |");
// Ascii dump
for (int j = rowStartIndex; j < rowEndIndex; j++) {
dump.append(BYTE2CHAR[getUnsignedByte(buf, j)]);
}
dump.append(BYTEPADDING[remainder]);
dump.append('|');
}
dump.append(NEWLINE +
"+--------+-------------------------------------------------+----------------+");
}
private static void appendHexDumpRowPrefix(StringBuilder dump, int row, int rowStartIndex) {
if (row < HEXDUMP_ROWPREFIXES.length) {
dump.append(HEXDUMP_ROWPREFIXES[row]);
} else {
dump.append(NEWLINE);
dump.append(Long.toHexString(rowStartIndex & 0xFFFFFFFFL | 0x100000000L));
dump.setCharAt(dump.length() - 9, '|');
dump.append('|');
}
}
public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF);
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册