提交 e8973e2c 编写于 作者: GreyZeng's avatar GreyZeng

update

上级 a497a69c
package snippet.chat;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.netty.channel.Channel;
import snippet.chat.protocol.Attributes;
import snippet.chat.protocol.Session;
public class SessionUtil {
private static final Map<String, Channel> userIdChannelMap = new ConcurrentHashMap<>();
public static void bindSession(Session session, Channel channel) {
userIdChannelMap.put(session.getUserId(), channel);
channel.attr(Attributes.SESSION).set(session);
}
public static void unBindSession(Channel channel) {
if (hasLogin(channel)) {
userIdChannelMap.remove(getSession(channel).getUserId());
channel.attr(Attributes.SESSION).set(null);
}
}
public static boolean hasLogin(Channel channel) {
return channel.hasAttr(Attributes.SESSION);
}
public static Session getSession(Channel channel) {
return channel.attr(Attributes.SESSION).get();
}
public static Channel getChannel(String userId) {
return userIdChannelMap.get(userId);
}
}
package snippet.chat.client;
import snippet.chat.LoginUtil;
import snippet.chat.protocol.LoginRequestPacket;
import snippet.chat.protocol.LoginResponsePacket;
import snippet.chat.protocol.MessageResponsePacket;
import snippet.chat.protocol.Packet;
import snippet.chat.protocol.PacketCodeC;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.buffer.ByteBuf;
import java.util.Date;
import java.util.UUID;
/**
* @author <a href="mailto:410486047@qq.com">Grey</a>
* @date 2022/9/15
* @since
*/
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println(new Date() + ": 客户端开始登录");
// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("flash");
loginRequestPacket.setPassword("pwd");
// 编码
ByteBuf buffer = PacketCodeC.INSTANCE.encode(ctx.alloc(), loginRequestPacket);
// 写数据
ctx.channel().writeAndFlush(buffer);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf byteBuf = (ByteBuf) msg;
Packet packet = PacketCodeC.INSTANCE.decode(byteBuf);
if (packet instanceof LoginResponsePacket loginResponsePacket) {
if (loginResponsePacket.isSuccess()) {
LoginUtil.markAsLogin(ctx.channel());
System.out.println(new Date() + ": 客户端登录成功");
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
}
} else if (packet instanceof MessageResponsePacket messageResponsePacket) {
System.out.println(new Date() + ": 收到服务端的消息:" + messageResponsePacket.getMessage());
}
}
}
package snippet.chat.client;
import snippet.chat.LoginUtil;
import snippet.chat.protocol.LoginRequestPacket;
import snippet.chat.protocol.LoginResponsePacket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Date;
import java.util.UUID;
import snippet.chat.SessionUtil;
import snippet.chat.protocol.LoginResponsePacket;
import snippet.chat.protocol.Session;
/**
* @author <a href="mailto:410486047@qq.com">Grey</a>
......@@ -16,25 +13,21 @@ import java.util.UUID;
*/
public class LoginResponseHandler extends SimpleChannelInboundHandler<LoginResponsePacket> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 创建登录对象
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
loginRequestPacket.setUserId(UUID.randomUUID().toString());
loginRequestPacket.setUsername("flash");
loginRequestPacket.setPassword("pwd");
// 写数据
ctx.channel().writeAndFlush(loginRequestPacket);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginResponsePacket loginResponsePacket) {
String userId = loginResponsePacket.getUserId();
String userName = loginResponsePacket.getUserName();
if (loginResponsePacket.isSuccess()) {
System.out.println(new Date() + ": 客户端登录成功");
LoginUtil.markAsLogin(ctx.channel());
System.out.println("[" + userName + "]登录成功,userId 为: " + loginResponsePacket.getUserId());
SessionUtil.bindSession(new Session(userId, userName), ctx.channel());
} else {
System.out.println(new Date() + ": 客户端登录失败,原因:" + loginResponsePacket.getReason());
System.out.println("[" + userName + "]登录失败,原因:" + loginResponsePacket.getReason());
}
}
}
\ No newline at end of file
@Override
public void channelInactive(ChannelHandlerContext ctx) {
System.out.println("客户端连接被关闭!");
}
}
......@@ -14,6 +14,9 @@ import java.util.Date;
public class MessageResponseHandler extends SimpleChannelInboundHandler<MessageResponsePacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageResponsePacket messageResponsePacket) {
System.out.println(new Date() + ": 收到服务端的消息: " + messageResponsePacket.getMessage());
String fromUserId = messageResponsePacket.getFromUserId();
String fromUserName = messageResponsePacket.getFromUserName();
System.out.println(fromUserId + ":" + fromUserName + " -> " + messageResponsePacket
.getMessage());
}
}
\ No newline at end of file
}
package snippet.chat.client;
import snippet.chat.LoginUtil;
import snippet.chat.protocol.MessageRequestPacket;
import snippet.chat.protocol.PacketDecoder;
import snippet.chat.protocol.PacketEncoder;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import snippet.chat.protocol.Splitter;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
......@@ -14,10 +12,12 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.Date;
import java.util.Scanner;
import java.util.concurrent.TimeUnit;
import snippet.chat.SessionUtil;
import snippet.chat.protocol.LoginRequestPacket;
import snippet.chat.protocol.MessageRequestPacket;
import snippet.chat.protocol.PacketDecoder;
import snippet.chat.protocol.PacketEncoder;
import snippet.chat.protocol.Splitter;
/**
* @author <a href="mailto:410486047@qq.com">Grey</a>
......@@ -71,16 +71,36 @@ public class NettyClient {
// 启动控制台客户端,给服务端发送消息
private static void startConsoleThread(Channel channel) {
Scanner sc = new Scanner(System.in);
LoginRequestPacket loginRequestPacket = new LoginRequestPacket();
new Thread(() -> {
while (!Thread.interrupted()) {
if (LoginUtil.hasLogin(channel)) {
System.out.println("输入消息发送至服务端: ");
Scanner sc = new Scanner(System.in);
String line = sc.nextLine();
if (!SessionUtil.hasLogin(channel)) {
System.out.print("输入用户名登录: ");
String username = sc.nextLine();
loginRequestPacket.setUserName(username);
// 密码使用默认的
loginRequestPacket.setPassword("pwd");
channel.writeAndFlush(new MessageRequestPacket(line));
// 发送登录数据包
channel.writeAndFlush(loginRequestPacket);
waitForLoginResponse();
} else {
String toUserId = sc.next();
String message = sc.next();
channel.writeAndFlush(new MessageRequestPacket(toUserId, message));
}
}
}).start();
}
private static void waitForLoginResponse() {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}
}
\ No newline at end of file
......@@ -8,5 +8,6 @@ import io.netty.util.AttributeKey;
* @since
*/
public interface Attributes {
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
AttributeKey<Boolean> LOGIN = AttributeKey.newInstance("login");
AttributeKey<Session> SESSION = AttributeKey.newInstance("session");
}
......@@ -2,19 +2,18 @@ package snippet.chat.protocol;
import lombok.Data;
import static snippet.chat.protocol.Command.LOGIN_REQUEST;
import static snippet.chat.protocol.Command.LOGIN_REQUEST;
@Data
public class LoginRequestPacket extends Packet {
private String userId;
private String username;
private String userName;
private String password;
@Override
public Byte getCommand() {
return LOGIN_REQUEST;
}
}
}
\ No newline at end of file
package snippet.chat.protocol;
import lombok.Data;
import lombok.Data;
import static snippet.chat.protocol.Command.LOGIN_RESPONSE;
import static snippet.chat.protocol.Command.LOGIN_RESPONSE;
@Data
public class LoginResponsePacket extends Packet {
private String userId;
private String userName;
private boolean success;
private String reason;
......
......@@ -12,8 +12,11 @@ import lombok.NoArgsConstructor;
@NoArgsConstructor
public class MessageRequestPacket extends Packet {
private String message;
private String toUserId;
public MessageRequestPacket(String message) {
public MessageRequestPacket(String toUserId, String message) {
this.toUserId = toUserId;
this.message = message;
}
......
package snippet.chat.protocol;
import lombok.Data;
import lombok.Data;
import static snippet.chat.protocol.Command.MESSAGE_RESPONSE;
import static snippet.chat.protocol.Command.MESSAGE_RESPONSE;
/**
* @author <a href="mailto:410486047@qq.com">Grey</a>
......@@ -11,10 +11,16 @@ import static snippet.chat.protocol.Command.MESSAGE_RESPONSE;
*/
@Data
public class MessageResponsePacket extends Packet {
private String fromUserId;
private String fromUserName;
private String message;
@Override
public Byte getCommand() {
return MESSAGE_RESPONSE;
}
}
package snippet.chat.protocol;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class Session {
// 用户唯一性标识
private String userId;
private String userName;
public Session(String userId, String userName) {
this.userId = userId;
this.userName = userName;
}
@Override
public String toString() {
return userId + ":" + userName;
}
}
package snippet.chat.server;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import snippet.chat.SessionUtil;
public class AuthHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!SessionUtil.hasLogin(ctx.channel())) {
ctx.channel().close();
} else {
ctx.pipeline().remove(this);
super.channelRead(ctx, msg);
}
}
}
package snippet.chat.server;
import snippet.chat.SessionUtil;
import snippet.chat.protocol.LoginRequestPacket;
import snippet.chat.protocol.LoginResponsePacket;
import snippet.chat.protocol.Session;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Date;
import java.util.UUID;
/**
* @author <a href="mailto:410486047@qq.com">Grey</a>
......@@ -15,13 +18,16 @@ import java.util.Date;
public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestPacket loginRequestPacket) {
System.out.println(new Date() + ": 收到客户端登录请求……");
LoginResponsePacket loginResponsePacket = new LoginResponsePacket();
loginResponsePacket.setVersion(loginRequestPacket.getVersion());
loginResponsePacket.setUserName(loginRequestPacket.getUserName());
if (valid(loginRequestPacket)) {
loginResponsePacket.setSuccess(true);
System.out.println(new Date() + ": 登录成功!");
String userId = randomUserId();
loginResponsePacket.setUserId(userId);
System.out.println("[" + loginRequestPacket.getUserName() + "]登录成功");
SessionUtil.bindSession(new Session(userId, loginRequestPacket.getUserName()), ctx.channel());
} else {
loginResponsePacket.setReason("账号密码校验失败");
loginResponsePacket.setSuccess(false);
......@@ -35,4 +41,13 @@ public class LoginRequestHandler extends SimpleChannelInboundHandler<LoginReques
private boolean valid(LoginRequestPacket loginRequestPacket) {
return true;
}
private static String randomUserId() {
return UUID.randomUUID().toString().split("-")[0];
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
SessionUtil.unBindSession(ctx.channel());
}
}
package snippet.chat.server;
import snippet.chat.SessionUtil;
import snippet.chat.protocol.MessageRequestPacket;
import snippet.chat.protocol.MessageResponsePacket;
import snippet.chat.protocol.Session;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
......@@ -14,11 +17,24 @@ import java.util.Date;
*/
public class MessageRequestHandler extends SimpleChannelInboundHandler<MessageRequestPacket> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket msg) {
protected void channelRead0(ChannelHandlerContext ctx, MessageRequestPacket messageRequestPacket) {
// 1.拿到消息发送方的会话信息
Session session = SessionUtil.getSession(ctx.channel());
// 2.通过消息发送方的会话信息构造要发送的消息
MessageResponsePacket messageResponsePacket = new MessageResponsePacket();
System.out.println(new Date() + ": 收到客户端消息: " + msg.getMessage());
messageResponsePacket.setMessage("服务端回复【" + msg.getMessage() + "】");
messageResponsePacket.setFromUserId(session.getUserId());
messageResponsePacket.setFromUserName(session.getUserName());
messageResponsePacket.setMessage(messageRequestPacket.getMessage());
// 3.拿到消息接收方的 channel
Channel toUserChannel = SessionUtil.getChannel(messageRequestPacket.getToUserId());
ctx.channel().writeAndFlush(messageResponsePacket);
// 4.将消息发送给消息接收方
if (toUserChannel != null && SessionUtil.hasLogin(toUserChannel)) {
toUserChannel.writeAndFlush(messageResponsePacket);
} else {
System.err.println("[" + messageRequestPacket.getToUserId() + "] 不在线,发送失败!");
}
}
}
package snippet.chat.server;
import snippet.chat.client.LifeCycleTestHandler;
import snippet.chat.protocol.PacketDecoder;
import snippet.chat.protocol.PacketEncoder;
import snippet.chat.protocol.Splitter;
......@@ -10,54 +9,51 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import snippet.chat.server.inbound.InHandlerA;
import snippet.chat.server.inbound.InHandlerB;
import snippet.chat.server.inbound.InHandlerC;
import snippet.chat.server.outbound.OutHandlerA;
import snippet.chat.server.outbound.OutHandlerB;
import snippet.chat.server.outbound.OutHandlerC;
import java.util.Date;
public class NettyServer {
private static final int PORT = 8000;
private static final int PORT = 8000;
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
public static void main(String[] args) {
NioEventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true).childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
// ch.pipeline().addLast(new ServerHandler());
//ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,7,4));
// ch.pipeline().addLast(new InHandlerA());
// ch.pipeline().addLast(new InHandlerB());
// ch.pipeline().addLast(new InHandlerC());
// ch.pipeline().addLast(new OutHandlerA());
// ch.pipeline().addLast(new OutHandlerB());
// ch.pipeline().addLast(new OutHandlerC());
//ch.pipeline().addLast(new LifeCycleTestHandler());
ch.pipeline().addLast(new Splitter());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
bind(serverBootstrap, PORT);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
}
// ch.pipeline().addLast(new
// LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,7,4));
// ch.pipeline().addLast(new InHandlerA());
// ch.pipeline().addLast(new InHandlerB());
// ch.pipeline().addLast(new InHandlerC());
// ch.pipeline().addLast(new OutHandlerA());
// ch.pipeline().addLast(new OutHandlerB());
// ch.pipeline().addLast(new OutHandlerC());
// ch.pipeline().addLast(new LifeCycleTestHandler());
ch.pipeline().addLast(new Splitter());
ch.pipeline().addLast(new PacketDecoder());
ch.pipeline().addLast(new LoginRequestHandler());
ch.pipeline().addLast(new AuthHandler());
ch.pipeline().addLast(new MessageRequestHandler());
ch.pipeline().addLast(new PacketEncoder());
}
});
bind(serverBootstrap, PORT);
}
private static void bind(final ServerBootstrap serverBootstrap, final int port) {
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册