fix:聊天室业务

上级 0286a841
...@@ -48,7 +48,7 @@ public class TestMessageCodec { ...@@ -48,7 +48,7 @@ public class TestMessageCodec {
MESSAGE_CODEC, MESSAGE_CODEC,
channelInboundHandler channelInboundHandler
); );
LoginRequestMessage message = new LoginRequestMessage("张三", "123456", "zs"); LoginRequestMessage message = new LoginRequestMessage("张三", "123456");
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); // 新建一个buf final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); // 新建一个buf
new MessageCodec().encode(null, message, buf); // 【编码 入站】 new MessageCodec().encode(null, message, buf); // 【编码 入站】
// 半包测试 // 半包测试
......
...@@ -16,10 +16,9 @@ public class LoginRequestMessage extends Message { ...@@ -16,10 +16,9 @@ public class LoginRequestMessage extends Message {
public LoginRequestMessage() { public LoginRequestMessage() {
} }
public LoginRequestMessage(String username, String password, String nickname) { public LoginRequestMessage(String username, String password) {
this.username = username; this.username = username;
this.password = password; this.password = password;
this.nickname = nickname;
} }
@Override @Override
......
...@@ -10,7 +10,6 @@ ...@@ -10,7 +10,6 @@
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<artifactId>netty-04-chat</artifactId> <artifactId>netty-04-chat</artifactId>
<properties> <properties>
<maven.compiler.source>8</maven.compiler.source> <maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target> <maven.compiler.target>8</maven.compiler.target>
......
package com.kwan.shuyu.client;
import com.kwan.shuyu.message.*;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class ChatClient {
public static void main(String[] args) {
final NioEventLoopGroup group = new NioEventLoopGroup();
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
// 倒计时锁,【主次线程之间 通信】
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1); // 初始基数1,减为零才继续往下运行,否则等待
// 登录状态 初始值 false 【主次线程之间 共享变量】
AtomicBoolean LOGIN = new AtomicBoolean(false);
AtomicBoolean EXIT = new AtomicBoolean(false);
try {
Bootstrap bs = new Bootstrap();
bs.channel(NioSocketChannel.class);
bs.group(group);
bs.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
// ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
// 用来判断 是不是读 空闲时间过长,或写空闲时间过长 (读,写,读写空闲时间限制) 0表示不关心
ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
/*
################################################################
###### ChannelDuplexHandler 可以同时作为 入站 和 出站处理器 #######
##### 没读到数据 触发 IdleState.READER_IDLE #######
##### 10 秒内 写 触发 IdleState.WRITER_IDLE ####### 【写要比服务端读的频率高些】
##### 读写 触发 IdleState.ALL_IDLE #######
################################################################
*/
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 【用来处理 读写之外的 特殊的事件】
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 是否 读超时
if (event.state() == IdleState.WRITER_IDLE) {
// log.debug(" 10秒 没写数据了 ,发送心跳包 ===================");
ctx.writeAndFlush(new PingMessage());
}
}
});
/**
* 【创建入站处理器 写入内容会触发出站 操作】 【流水线 会向上执行出站Handler, 到 ProcotolFrameDecoder(入站停止)】
* 1. 登录操作
* 2. 另起线程:菜单里进行 收发消息操作
*/
ch.pipeline().addLast("ChatClient handler", new ChannelInboundHandlerAdapter() {
// ###################### [ 3 ] ######################
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 1. 处理登录 [登录成功 登录状态=true]
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
if (responseMessage.isSuccess()) LOGIN.set(true);
WAIT_FOR_LOGIN.countDown(); // 减一 唤醒 线程:system in
}
}
// ###################### [ 1 ] ######################
@Override // 【 连接建立后触发一次 】
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 另起线程(不然会被主线程阻塞) 接受用户输入消息 【登录】
new Thread(() -> {
final Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名");
final String username = scanner.nextLine();
System.out.println("请输入密码");
final String password = scanner.nextLine();
// 构造消息对象
final LoginRequestMessage message = new LoginRequestMessage(username, password);
// 发送消息
ctx.writeAndFlush(message);
// ###################### [ 2 ] ######################
log.debug("等待后续操作......");
try {
WAIT_FOR_LOGIN.await(); // 【 阻塞住,等 channelRead响应回来时 继续运行 】
} catch (InterruptedException e) {
e.printStackTrace();
}
// ###################### [ 4 ] ######################
// 登录失败 停止运行
if (!LOGIN.get()) {
ctx.channel().close(); // 触发 【channel.closeFuture().sync(); 向下运行】
return;
}
// 打印菜单
while (true) {
System.out.println("============ 功能菜单 ============");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
String command = scanner.nextLine();
final String[] s = command.split(" ");
switch (s[0]) {
case "send": // 发送消息
ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
break;
case "gsend": // 群里 发送消息
ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
break;
case "gcreate": // 创建群
final Set<String> set = new HashSet(Arrays.asList(s[2].split(",")));
set.add(username);
ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
break;
case "gmembers": // 查看群列表
ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
break;
case "gjoin":
ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
break;
case "gquit":
ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
break;
case "quit":
ctx.channel().close(); // 触发 【channel.closeFuture().sync(); 向下运行】
return;
}
}
}, "system in").start();
}
// 在连接断开时触发
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("Client...主动-连接已经断开,按任意键退出..");
EXIT.set(true);
}
// 在出现异常时触发
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.debug("Client...异常-连接已经断开,按任意键退出..{}", cause.getMessage());
EXIT.set(true);
}
});
}
});
Channel channel = bs.connect("localhost", 8080).sync().channel();
// ... 这个位置 : 连接已经建立好了 【可以写 登录 , 也可以在 channelActive(连接建立后触发此事件) 里写】
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("Client error", e);
} finally {
group.shutdownGracefully();
}
}
}
\ No newline at end of file
package com.kwan.shuyu.config;
import com.kwan.shuyu.protocol.MySerializer;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 使用配置文件 获取 编解码方法
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:13
*/
public abstract class Config {
/**
* 获取配置的属性值
*/
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* 获取端口号
*
* @return
*/
public static int getServerPort() {
final String value = properties.getProperty("server.port");
if (value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
/**
* 动态获取序列化算法
*
* @return
*/
public static MySerializer.Algorithm getMySerializerAlgorithm() {
final String value = properties.getProperty("mySerializer.algorithm");
if (value == null) {
return MySerializer.Algorithm.Java;
} else {
// 拼接成 MySerializer.Algorithm.Java 或 MySerializer.Algorithm.Json
return MySerializer.Algorithm.valueOf(value);
}
}
}
\ No newline at end of file
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
@Override
public int getMessageType() {
return ChatRequestMessage;
}
@Override
public String toString() {
return "ChatRequestMessage{" +
"content='" + content + '\'' +
", to='" + to + '\'' +
", from='" + from + '\'' +
'}';
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
@Override
public String toString() {
return "ChatResponseMessage{" +
"from='" + from + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
import java.util.Set;
@Data
@ToString(callSuper = true)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupJoinRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinResponseMessage extends AbstractResponseMessage {
public GroupJoinResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupJoinResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName) {
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupMembersRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
import java.util.Set;
@Data
@ToString(callSuper = true)
public class GroupMembersResponseMessage extends Message {
private Set<String> members;
public GroupMembersResponseMessage(Set<String> members) {
this.members = members;
}
@Override
public int getMessageType() {
return GroupMembersResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupQuitRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitResponseMessage extends AbstractResponseMessage {
public GroupQuitResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupQuitResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
// messageClasses.put(RPC_MESSAGE_TYPE_RE QUEST, RpcRequestMessage.class);
// messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
\ No newline at end of file
package com.kwan.shuyu.message;
public class PingMessage extends Message {
@Override
public int getMessageType() {
return PingMessage;
}
}
package com.kwan.shuyu.message;
public class PongMessage extends Message {
@Override
public int getMessageType() {
return PongMessage;
}
}
package com.kwan.shuyu.protocol;
import com.kwan.shuyu.config.Config;
import com.kwan.shuyu.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 自定义消息编解码类
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:15
*/
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本
out.writeByte(Config.getMySerializerAlgorithm().ordinal()); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
final byte[] bytes = Config.getMySerializerAlgorithm().serializ(msg);
// 写入内容 长度
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
/**
* 加入List 方便传递给 下一个Handler
*/
outList.add(out);
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt(); // 大端4字节的 魔数
byte version = in.readByte(); // 版本
byte serializerType = in.readByte();// 0 Java 1 Json
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length); // 读取进来,下面再进行 解码
// 1. 找到反序列化算法
final MySerializer.Algorithm algorithm = MySerializer.Algorithm.values()[serializerType];
// 2. 找到消息具体类型
final Object message = algorithm.deserializer(Message.getMessageClass(messageType), bytes);
log.debug("{}", message);
/**
* 加入List 方便传递给 下一个Handler
*/
out.add(message);
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import com.google.gson.*;
import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
/**
* 自定义序列化和反序列化
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:12
*/
public interface MySerializer {
/**
* 序列化
*
* @param object
* @param <T>
* @return
*/
<T> byte[] serializ(T object);
/**
* 反序列化
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
<T> T deserializer(Class<T> clazz, byte[] bytes);
/**
* 多个算法
* 枚举对象.ordinal() 获取顺序int, 第一个0 第二个1 ...
*/
enum Algorithm implements MySerializer {
Java {
@Override
public <T> T deserializer(Class<T> clazz, byte[] bytes) {
try {
// 处理内容
final ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型
T message = (T) ois.readObject();
return message;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化算法失败!");
}
}
@Override
public <T> byte[] serializ(T object) {
try {
// 处理内容 用对象流包装字节数组 并写入
ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组
ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化算法失败!", e);
}
}
},
Json {
@Override
public <T> T deserializer(Class<T> clazz, byte[] bytes) {
// 指定 下面自定义处理类 生成 对象
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serializ(T object) {
// 指定 下面自定义处理类 生成 对象
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
// 针对之前报出:不支持 Class类转json的异常 做处理
class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override
public JsonElement serialize(Class<?> src, Type type, JsonSerializationContext jsonSerializationContext) {
// JsonPrimitive 转化基本数据类型
return new JsonPrimitive(src.getName());
}
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* 自定义帧解码器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:11
*/
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
\ No newline at end of file
package com.kwan.shuyu.server;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import com.kwan.shuyu.server.handler.*;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
/**
* 聊天--服务端
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:25
*/
@Slf4j
public class ChatServer {
public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup boss = new NioEventLoopGroup();
final NioEventLoopGroup worker = new NioEventLoopGroup();
// 局部变量
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler(); //--登录---处理器
final ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();//--单聊---处理器
final GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();//--创建群聊---处理器
final GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler(); //--群聊---处理器
final QuitHandler QUIT_HANDLER = new QuitHandler(); //--断开连接---处理器
try {
final ServerBootstrap bs = new ServerBootstrap();
bs.channel(NioServerSocketChannel.class);
bs.group(boss, worker);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 用来判断 是不是读 空闲时间过长,或写空闲时间过长 (读,写,读写空闲时间限制) 0表示不关心
ch.pipeline().addLast(new IdleStateHandler(12, 0, 0));
/*
################################################################
##### ChannelDuplexHandler 可以同时作为 入站 和 出站处理器 #######
##### 12 秒内 没读到数据 触发 IdleState.READER_IDLE #######
##### 写 触发 IdleState.WRITER_IDLE #######
##### 读写 触发 IdleState.ALL_IDLE #######
################################################################
*/
ch.pipeline().addLast(new ChannelDuplexHandler() {
// 【用来处理 读写之外的 特殊的事件】
@Override //-- 触发的用户事件 --
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
// 是否 读超时
if (event.state() == IdleState.READER_IDLE) {
log.debug("==============================已经12秒没读到数据了!====================================");
ctx.channel().close();
}
}
});
ch.pipeline().addLast(QUIT_HANDLER); //--断开连接---处理器
ch.pipeline().addLast(new ProcotolFrameDecoder()); // 帧解码器 【与自定义编解码器 MessageCodecSharable一起配置参数】
ch.pipeline().addLast(LOGGING_HANDLER); // 日志
ch.pipeline().addLast(MESSAGE_CODEC); // 出站入站的 自定义编解码器 【 解析消息类型 】
// simple处理器 【针对性的对登录进行处理】 【流水线 会向上执行出站Handler, 到 ProcotolFrameDecoder(入站停止)】
ch.pipeline().addLast(LOGIN_HANDLER); //--登录---处理器
ch.pipeline().addLast(CHAT_HANDLER); //--单聊---处理器
ch.pipeline().addLast(GROUP_CREATE_HANDLER); //--创建群聊---处理器
ch.pipeline().addLast(GROUP_CHAT_HANDLER); //--群聊---处理器
}
});
ChannelFuture channelFuture = bs.bind(8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
package com.kwan.shuyu.server.handler;
import com.kwan.shuyu.message.ChatRequestMessage;
import com.kwan.shuyu.message.ChatResponseMessage;
import com.kwan.shuyu.server.session.SessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 单聊---管理器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:23
*/
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
final String to = msg.getTo();
final Channel channel = SessionFactory.getSession().getChannel(to);
// 1. 在线
if (channel != null) {
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
}
// 2. 不在线 给发送者发送 用户不在线
else {
ctx.writeAndFlush(new ChatResponseMessage(false, "用户不在线!"));
}
}
}
\ No newline at end of file
package com.kwan.shuyu.server.handler;
import com.kwan.shuyu.message.GroupChatRequestMessage;
import com.kwan.shuyu.message.GroupChatResponseMessage;
import com.kwan.shuyu.server.session.GroupSession;
import com.kwan.shuyu.server.session.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
/**
* 群聊---管理器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:23
*/
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
final GroupSession groupSession = GroupSessionFactory.getGroupSession();
final List<Channel> channelList = groupSession.getMembersChannel(msg.getGroupName());
for (Channel channel : channelList) {
channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
}
}
}
\ No newline at end of file
package com.kwan.shuyu.server.handler;
import com.kwan.shuyu.message.GroupCreateRequestMessage;
import com.kwan.shuyu.message.GroupCreateResponseMessage;
import com.kwan.shuyu.server.session.Group;
import com.kwan.shuyu.server.session.GroupSession;
import com.kwan.shuyu.server.session.GroupSessionFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.List;
import java.util.Set;
/**
* 创建群---管理器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:23
*/
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
final String groupName = msg.getGroupName();
final Set<String> members = msg.getMembers();
// 群管理器
final GroupSession groupSession = GroupSessionFactory.getGroupSession();
final Group group = groupSession.createGroup(groupName, members);
// 创建成功
if (group == null) {
// 发送成功消息
ctx.writeAndFlush(new GroupCreateResponseMessage(true, "成功创建群聊:" + groupName));
// 发送拉群消息
final List<Channel> channels = groupSession.getMembersChannel(groupName);
for (Channel ch : channels) {
ch.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入群聊:" + groupName));
}
}
// 创建失败
else {
ctx.writeAndFlush(new GroupCreateResponseMessage(false, "已存在群: " + groupName));
}
}
}
\ No newline at end of file
package com.kwan.shuyu.server.handler;
import com.kwan.shuyu.message.LoginRequestMessage;
import com.kwan.shuyu.message.LoginResponseMessage;
import com.kwan.shuyu.server.service.UserServiceFactory;
import com.kwan.shuyu.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* 登录请求消息处理器,没有共享变量、没有状态信息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:22
*/
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
final String username = msg.getUsername();
final String password = msg.getPassword();
final boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage responseMessage;
if (login) {
SessionFactory.getSession().bind(ctx.channel(), username); // 用户 、channel 简历关系
responseMessage = new LoginResponseMessage(true, "登录成功!");
} else {
responseMessage = new LoginResponseMessage(false, "用户名或密码错误!");
}
// 登录结果 返回
ctx.writeAndFlush(responseMessage); // 【 【当前节点】 开始向上 找 "出站Handler" (ch.writeAndFlush 和 ctx.channel().write(msg) 从尾部向上查找)】
}
}
\ No newline at end of file
package com.kwan.shuyu.server.handler;
import com.kwan.shuyu.server.session.SessionFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 退出---处理器,只关心 异常事件 和 ChannelInActive事件
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:22
*/
@ChannelHandler.Sharable
@Slf4j
public class QuitHandler extends ChannelInboundHandlerAdapter {
/**
* 连接 断开时 触发
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 1. 解绑 channel
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} ///////////////主动断开了", ctx.channel());
}
/**
* 异常断开 disconnect() 不会触发
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// 1. 解绑 channel
SessionFactory.getSession().unbind(ctx.channel());
log.debug("{} ///////////////异常断开了,异常是 {}", ctx.channel(), cause);
}
/**
* 新连接
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
log.debug("======================= handlerAdded---------------------");
}
/**
* 断开连接 disconnect 会触发
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
log.debug("======================= handlerRemoved---------------------");
}
}
\ No newline at end of file
package com.kwan.shuyu.server.service;
/**
* 用户管理接口
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:21
*/
public interface UserService {
/**
* 登录
*
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}
package com.kwan.shuyu.server.service;
/**
* 用户接口工厂
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:21
*/
public abstract class UserServiceFactory {
private static UserService userService = new UserServiceMemoryImpl();
public static UserService getUserService() {
return userService;
}
}
\ No newline at end of file
package com.kwan.shuyu.server.service;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 用户缓存信息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:21
*/
public class UserServiceMemoryImpl implements UserService {
@Override
public boolean login(String username, String password) {
final String pass = allUserMap.get(username);
if (pass == null) {
return false;
}
return pass.equals(password);
}
private Map<String, String> allUserMap = new ConcurrentHashMap<>();
{
allUserMap.put("zhangsan", "123");
allUserMap.put("lisi", "123");
allUserMap.put("wangwu", "123");
allUserMap.put("zhaoliu", "123");
allUserMap.put("qianqi", "123");
}
}
\ No newline at end of file
package com.kwan.shuyu.server.session;
import lombok.Data;
import java.util.Collections;
import java.util.Set;
/**
* 聊天组,即聊天室
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:24
*/
@Data
public class Group {
/**
* 聊天室名称
*/
private String name;
/**
* 聊天室成员
*/
private Set<String> members;
public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
public Group(String name, Set<String> members) {
this.name = name;
this.members = members;
}
}
\ No newline at end of file
package com.kwan.shuyu.server.session;
import io.netty.channel.Channel;
import java.util.List;
import java.util.Set;
/**
*
* 聊天组会话管理接口
*@version : 2.2.0
*@author : qinyingjie
*@date : 2023/4/28 11:24
*/
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功
* @param name 组名
* @param members 成员
* @return 成功时返回 null , 失败返回 原来的value
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
}
package com.kwan.shuyu.server.session;
/**
* 群组session接口
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:24
*/
public abstract class GroupSessionFactory {
private static GroupSession session = new GroupSessionMemoryImpl();
public static GroupSession getGroupSession() {
return session;
}
}
package com.kwan.shuyu.server.session;
import io.netty.channel.Channel;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
/**
* 群组session缓存
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:24
*/
public class GroupSessionMemoryImpl implements GroupSession {
private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
@Override
public Group createGroup(String name, Set<String> members) {
Group group = new Group(name, members);
return groupMap.putIfAbsent(name, group); // 没有则放入
}
@Override
public Group joinMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().add(member); // 指定 key 的值进行重新计算,前提是该 key 存在于 hashMap 中
return value;
});
}
@Override
public Group removeMember(String name, String member) {
return groupMap.computeIfPresent(name, (key, value) -> {
value.getMembers().remove(member);
return value;
});
}
@Override
public Group removeGroup(String name) {
return groupMap.remove(name);
}
@Override
public Set<String> getMembers(String name) {
return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
}
// 根据 【群聊名称】 -> 【用户名Set】 -> map遍历 -> 【用户名获取到 所有对应的 channel】 -> 【channel List】
@Override
public List<Channel> getMembersChannel(String name) {
return getMembers(name).stream()
.map(member -> SessionFactory.getSession().getChannel(member)) // 根据成员名 获得Channel
.filter(Objects::nonNull) // 不是 null 才会 被下面收集
.collect(Collectors.toList());
}
}
package com.kwan.shuyu.server.session;
import io.netty.channel.Channel;
/**
* 会话管理接口
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:25
*/
public interface Session {
/**
* 绑定会话
*
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
*
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
*
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
*
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
*
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}
package com.kwan.shuyu.server.session;
/**
* session工厂
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:25
*/
public abstract class SessionFactory {
private static Session session = new SessionMemoryImpl();
public static Session getSession() {
return session;
}
}
package com.kwan.shuyu.server.session;
import io.netty.channel.Channel;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* session缓存
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:25
*/
public class SessionMemoryImpl implements Session {
private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
private final Map<Channel, Map<String, Object>> channelAttributesMap = new ConcurrentHashMap<>();// 每个 channel 包含的属性
@Override
public void bind(Channel channel, String username) {
usernameChannelMap.put(username, channel);
channelUsernameMap.put(channel, username);
channelAttributesMap.put(channel, new ConcurrentHashMap<>());
}
@Override
public void unbind(Channel channel) {
String username = channelUsernameMap.remove(channel);
usernameChannelMap.remove(username);
channelAttributesMap.remove(channel);
}
@Override
public Object getAttribute(Channel channel, String name) {
return channelAttributesMap.get(channel).get(name);
}
@Override
public void setAttribute(Channel channel, String name, Object value) {
channelAttributesMap.get(channel).put(name, value);
}
@Override
public Channel getChannel(String username) {
return usernameChannelMap.get(username);
}
@Override
public String toString() {
return usernameChannelMap.toString();
}
}
#端口号
server.port = 6000
# Java 选择对象流 序列化;Json 选择Json序列化
mySerializer.algorithm=Json
com.rpc.server.service.HelloService=com.rpc.server.service.HelloServiceImpl2
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册