fix:rpc的样例

上级 07007321
......@@ -22,6 +22,7 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@SuppressWarnings("ALL")
@Slf4j
public class ChatClient {
public static void main(String[] args) {
......@@ -72,11 +73,13 @@ public class ChatClient {
*/
ch.pipeline().addLast("ChatClient handler", new ChannelInboundHandlerAdapter() {
// ###################### [ 3 ] ######################
@SuppressWarnings("AliControlFlowStatementWithoutBraces")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 1. 处理登录 [登录成功 登录状态=true]
if ((msg instanceof LoginResponseMessage)) {
LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
//noinspection AliControlFlowStatementWithoutBraces
if (responseMessage.isSuccess()) LOGIN.set(true);
WAIT_FOR_LOGIN.countDown(); // 减一 唤醒 线程:system in
}
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>netty-demo</artifactId>
<groupId>com.kwan</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>netty-05-rpc</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
\ No newline at end of file
package com.kwan.shuyu.client;
import com.kwan.shuyu.handler.RpcResponseMessageHandler;
import com.kwan.shuyu.message.RpcRequestMessage;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // 【使用 asm包方法】
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler RPC_RESPONSE_HANDLER = new RpcResponseMessageHandler();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder()); // 【使用 asm包方法】
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_RESPONSE_HANDLER);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
final ChannelFuture future = channel.writeAndFlush(
new RpcRequestMessage(
1,
"com.rpc.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"helloworld!"}
)
// 发送不成功 【打印错误信息】
).addListener(promise -> {
if (!promise.isSuccess()) {
Throwable cause = promise.cause();
log.error("error : ", cause);
}
});
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
\ No newline at end of file
package com.kwan.shuyu.client;
import com.kwan.shuyu.handler.RpcResponseMessageHandler;
import com.kwan.shuyu.message.RpcRequestMessage;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import com.kwan.shuyu.protocol.SequenceIdGenerator;
import com.kwan.shuyu.server.service.HelloService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultPromise;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.Proxy;
/**
* 使用代理类调用方法
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/5 13:46
*/
@Slf4j
public class RpcClientPlus {
private static Channel channel = null;
private static final Object LOCK = new Object();
public static void main(String[] args) {
final HelloService service = getProxyService(HelloService.class);
log.debug("11111111111111111111111111111111111111111111111111111111111111111111111111");
service.sayHello("在不在!");
}
/**
* 创建代理类,内部 把代理对象转为消息发送
*
* @param serviceClass
* @param <T>
* @return
*/
public static <T> T getProxyService(Class<T> serviceClass) {
// 类加载器, 代理的实现接口的数组,
ClassLoader loader = serviceClass.getClassLoader();
Class[] interfaces = {serviceClass};
log.debug("AAAAAAAAAAAAAAAAAAAAAAAAAAAAA");
final int nextId = SequenceIdGenerator.nextId();
// sayHello "你好!"
final Object o = Proxy.newProxyInstance(loader, interfaces, (proxy, method, args) -> {
// 1. 将方法调用 转为 消息对象
final RpcRequestMessage msg = new RpcRequestMessage(
nextId,
serviceClass.getName(),
method.getName(),
method.getReturnType(),
method.getParameterTypes(),
args
);
log.debug("22222222222222222222222222222222222222222222222222222222222222222222222222");
// 2. 将消息对象 发送 出去 【但是 一时半会 结果不过来】
getChannel().writeAndFlush(msg);
// 3. 准备一个Promise对象 来接收结果 【指定 Promise对象 【异步】接受结果的线程】 【这里不会阻塞住,所以下面得阻塞等待结果】
DefaultPromise<Object> promise = new DefaultPromise<>(getChannel().eventLoop());
RpcResponseMessageHandler.PROMISES.put(nextId, promise);
log.debug("33333333333333333333333333333333333333333333333333333333333333333333333333");
// 4. await()不会抛异常, 【【【 同步阻塞 等待promise的结果(成功or失败) 】】】
promise.await();
log.debug("55555555555555555555555555555555555555555555555555555555555555555555555555");
if (promise.isSuccess()) {
// 调用正常
return promise.getNow();
} else {
// 调用失败
throw new RuntimeException(promise.cause());
}
});
log.debug("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB");
// 返回 被代理的对象
return (T) o;
}
/**
* 初始化获取Channel 单例 + 【双重检查锁】
*
* @return
*/
public static Channel getChannel() {
if (channel != null) {
return channel;
}
synchronized (LOCK) {
/**
* t1 和 t2 一起到这里
* t1 进来 执行完了,锁放开
* t2 被唤醒才进来
* 此时没有这行检查 将会再初始化一次
*/
if (channel != null) {
return channel;
}
initChannel();
return channel;
}
}
/**
* 初始化 channel
*/
private static void initChannel() {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable messageCodec = new MessageCodecSharable(); // 【使用 asm包方法】
// rpc 响应消息处理器,待实现
RpcResponseMessageHandler rpcResponseHandler = new RpcResponseMessageHandler();
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder()); // 【使用 asm包方法】
ch.pipeline().addLast(loggingHandler);
ch.pipeline().addLast(messageCodec);
ch.pipeline().addLast(rpcResponseHandler);
}
});
try {
channel = bootstrap.connect("localhost", 8080).sync().channel();
/**
* >>>>>>>>>>>>>>>>>>>>>>>>>>>>
* channel.closeFuture().sync() 【同步将等待 结束连接才往下执行,将不会返回channel】
* 改造 ========================
* 异步监听 指导 结束的动作
* <<<<<<<<<<<<<<<<<<<<<<<<<<<<
*/
channel.closeFuture().addListener(future -> {
group.shutdownGracefully();
});
} catch (Exception e) {
log.error("client error", e);
}
}
}
\ No newline at end of file
package com.kwan.shuyu.config;
import com.kwan.shuyu.protocol.MySerializer;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* 使用配置文件 获取 编解码方法
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:13
*/
public abstract class Config {
/**
* 获取配置的属性值
*/
static Properties properties;
static {
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* 获取端口号
*
* @return
*/
public static int getServerPort() {
final String value = properties.getProperty("server.port");
if (value == null) {
return 8080;
} else {
return Integer.parseInt(value);
}
}
/**
* 动态获取序列化算法
*
* @return
*/
public static MySerializer.Algorithm getMySerializerAlgorithm() {
final String value = properties.getProperty("mySerializer.algorithm");
if (value == null) {
return MySerializer.Algorithm.Java;
} else {
// 拼接成 MySerializer.Algorithm.Java 或 MySerializer.Algorithm.Json
return MySerializer.Algorithm.valueOf(value);
}
}
}
\ No newline at end of file
package com.kwan.shuyu.handler;
import com.kwan.shuyu.message.RpcRequestMessage;
import com.kwan.shuyu.message.RpcResponseMessage;
import com.kwan.shuyu.server.service.HelloService;
import com.kwan.shuyu.server.service.ServiceFactory;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
@Slf4j
@ChannelHandler.Sharable
public class RpcRequestMessageHandler extends SimpleChannelInboundHandler<RpcRequestMessage> {
/**
* 处理调用,最后返回 RpcResponseMessage对象
*
* @param ctx
* @param message
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcRequestMessage message) {
final RpcResponseMessage response = new RpcResponseMessage();
response.setSequenceId(message.getSequenceId());
try {
// 上面对象里 获取【接口类】全限定名
final Class<?> interfaceClazz = Class.forName(message.getInterfaceName());
// 根据接口类 获取 【接口实现类】
final HelloService service = (HelloService) ServiceFactory.getService(interfaceClazz);
// clazz 根据 方法名和参数类型 确定 【具体方法】
final Method method = service.getClass().getMethod(message.getMethodName(), message.getParameterTypes());
// 根据 具体方法 使用代理 【执行方法】
final Object invoke = method.invoke(service, message.getParameterValue());
response.setReturnValue(invoke);
} catch (Exception e) {
log.debug("xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 出异常了 xxxxxxxxxxxxxxxxxxxxxxxxxx");
e.printStackTrace();
response.setExceptionValue(new Exception("远程调用出错:" + e.getCause().getMessage()));
}
ctx.writeAndFlush(response);
}
public static void main(String[] args) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
final RpcRequestMessage requestMsg = new RpcRequestMessage(
1,
"com.rpc.server.service.HelloService",
"sayHello",
String.class,
new Class[]{String.class},
new Object[]{"helloworld!"}
);
// 上面对象里 获取【接口类】全限定名
final Class<?> interfaceClazz = Class.forName(requestMsg.getInterfaceName());
// 根据接口类 获取 【接口实现类】
final HelloService service = (HelloService) ServiceFactory.getService(interfaceClazz);
// 根据 方法名和参数类型 确定 【具体方法】
final Method method = service.getClass().getMethod(requestMsg.getMethodName(), requestMsg.getParameterTypes());
// 根据 具体方法 使用代理 【执行方法】
final Object invoke = method.invoke(service, requestMsg.getParameterValue());
System.out.println(invoke);
}
}
\ No newline at end of file
package com.kwan.shuyu.handler;
import com.kwan.shuyu.message.RpcResponseMessage;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.Promise;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 泛型通配符 Promise<?> 只能 从泛型容器里获取值,不能从泛型容器中设置值
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/5 13:44
*/
@Slf4j
@ChannelHandler.Sharable
public class RpcResponseMessageHandler extends SimpleChannelInboundHandler<RpcResponseMessage> {
/**
* 这个状态 算有状态算共享信息,但是是使用了 ConcurrentHashMap 和 下面 remove()单步操作 所以没有线程安全问题
*/
public static final Map<Integer, Promise<Object>> PROMISES = new ConcurrentHashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponseMessage msg) throws Exception {
//获取 并销毁值
final Promise<Object> promise = PROMISES.remove(msg.getSequenceId()); //
if (promise != null) {
final Object returnValue = msg.getReturnValue(); // 正常结果
final Exception exceptionValue = msg.getExceptionValue();// 异常结果 【约定 为 null才是正常的】
if (exceptionValue != null) {
promise.setFailure(exceptionValue);
} else {
promise.setSuccess(returnValue);
}
}
log.debug("44444444444444444444444444444444444444444444444444444444444444444444444444");
log.debug("{}", msg);
}
}
\ No newline at end of file
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public abstract class AbstractResponseMessage extends Message {
private boolean success;
private String reason;
public AbstractResponseMessage() {
}
public AbstractResponseMessage(boolean success, String reason) {
this.success = success;
this.reason = reason;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatRequestMessage extends Message {
private String content;
private String to;
private String from;
public ChatRequestMessage() {
}
public ChatRequestMessage(String from, String to, String content) {
this.from = from;
this.to = to;
this.content = content;
}
@Override
public int getMessageType() {
return ChatRequestMessage;
}
@Override
public String toString() {
return "ChatRequestMessage{" +
"content='" + content + '\'' +
", to='" + to + '\'' +
", from='" + from + '\'' +
'}';
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class ChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public ChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public ChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return ChatResponseMessage;
}
@Override
public String toString() {
return "ChatResponseMessage{" +
"from='" + from + '\'' +
", content='" + content + '\'' +
'}';
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatRequestMessage extends Message {
private String content;
private String groupName;
private String from;
public GroupChatRequestMessage(String from, String groupName, String content) {
this.content = content;
this.groupName = groupName;
this.from = from;
}
@Override
public int getMessageType() {
return GroupChatRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupChatResponseMessage extends AbstractResponseMessage {
private String from;
private String content;
public GroupChatResponseMessage(boolean success, String reason) {
super(success, reason);
}
public GroupChatResponseMessage(String from, String content) {
this.from = from;
this.content = content;
}
@Override
public int getMessageType() {
return GroupChatResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import java.util.Set;
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper=false)
public class GroupCreateRequestMessage extends Message {
private String groupName;
private Set<String> members;
public GroupCreateRequestMessage(String groupName, Set<String> members) {
this.groupName = groupName;
this.members = members;
}
@Override
public int getMessageType() {
return GroupCreateRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupCreateResponseMessage extends AbstractResponseMessage {
public GroupCreateResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupCreateResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinRequestMessage extends Message {
private String groupName;
private String username;
public GroupJoinRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupJoinRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupJoinResponseMessage extends AbstractResponseMessage {
public GroupJoinResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupJoinResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupMembersRequestMessage extends Message {
private String groupName;
public GroupMembersRequestMessage(String groupName) {
this.groupName = groupName;
}
@Override
public int getMessageType() {
return GroupMembersRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
import java.util.Set;
@Data
@ToString(callSuper = true)
public class GroupMembersResponseMessage extends Message {
private Set<String> members;
public GroupMembersResponseMessage(Set<String> members) {
this.members = members;
}
@Override
public int getMessageType() {
return GroupMembersResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitRequestMessage extends Message {
private String groupName;
private String username;
public GroupQuitRequestMessage(String username, String groupName) {
this.groupName = groupName;
this.username = username;
}
@Override
public int getMessageType() {
return GroupQuitRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class GroupQuitResponseMessage extends AbstractResponseMessage {
public GroupQuitResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return GroupQuitResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginRequestMessage extends Message {
private String username;
private String password;
public LoginRequestMessage() {
}
public LoginRequestMessage(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public int getMessageType() {
return LoginRequestMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class LoginResponseMessage extends AbstractResponseMessage {
public LoginResponseMessage(boolean success, String reason) {
super(success, reason);
}
@Override
public int getMessageType() {
return LoginResponseMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
@Data
public abstract class Message implements Serializable {
public static Class<?> getMessageClass(int messageType) {
return messageClasses.get(messageType);
}
private int sequenceId;
private int messageType;
public abstract int getMessageType();
public static final int LoginRequestMessage = 0;
public static final int LoginResponseMessage = 1;
public static final int ChatRequestMessage = 2;
public static final int ChatResponseMessage = 3;
public static final int GroupCreateRequestMessage = 4;
public static final int GroupCreateResponseMessage = 5;
public static final int GroupJoinRequestMessage = 6;
public static final int GroupJoinResponseMessage = 7;
public static final int GroupQuitRequestMessage = 8;
public static final int GroupQuitResponseMessage = 9;
public static final int GroupChatRequestMessage = 10;
public static final int GroupChatResponseMessage = 11;
public static final int GroupMembersRequestMessage = 12;
public static final int GroupMembersResponseMessage = 13;
public static final int PingMessage = 14;
public static final int PongMessage = 15;
public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
public static final int RPC_MESSAGE_TYPE_RESPONSE = 102;
private static final Map<Integer, Class<?>> messageClasses = new HashMap<>();
static {
messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
// messageClasses.put(RPC_MESSAGE_TYPE_RE QUEST, RpcRequestMessage.class);
// messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
}
}
\ No newline at end of file
package com.kwan.shuyu.message;
public class PingMessage extends Message {
@Override
public int getMessageType() {
return PingMessage;
}
}
package com.kwan.shuyu.message;
public class PongMessage extends Message {
@Override
public int getMessageType() {
return PongMessage;
}
}
package com.kwan.shuyu.message;
import lombok.Getter;
import lombok.ToString;
@Getter
@ToString(callSuper = true)
public class RpcRequestMessage extends Message {
/**
* 调用的接口全限定名,服务端根据它找到实现
*/
private String interfaceName;
/**
* 调用接口中的方法名
*/
private String methodName;
/**
* 方法返回类型
*/
private Class<?> returnType;
/**
* 方法参数类型数组
*/
private Class[] parameterTypes;
/**
* 方法参数值数组
*/
private Object[] parameterValue;
public RpcRequestMessage(int sequenceId, String interfaceName, String methodName, Class<?> returnType, Class[] parameterTypes, Object[] parameterValue) {
super.setSequenceId(sequenceId);
this.interfaceName = interfaceName;
this.methodName = methodName;
this.returnType = returnType;
this.parameterTypes = parameterTypes;
this.parameterValue = parameterValue;
}
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_REQUEST;
}
}
\ No newline at end of file
package com.kwan.shuyu.message;
import lombok.Data;
import lombok.ToString;
@Data
@ToString(callSuper = true)
public class RpcResponseMessage extends Message {
/**
* 返回值
*/
private Object returnValue;
/**
* 异常值
*/
private Exception exceptionValue;
@Override
public int getMessageType() {
return RPC_MESSAGE_TYPE_RESPONSE;
}
}
\ No newline at end of file
package com.kwan.shuyu.onlytest;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class Client {
public static void main(String[] args) throws InterruptedException {
final NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
try {
final Channel channel = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());
}
})
.connect("localhost", 7777).sync().channel();
final ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
buf.writeBytes("abcdfmw9".getBytes());
channel.writeAndFlush(
buf
).addListener(promise -> {
if (!promise.isSuccess()) {
Throwable cause = promise.cause();
log.error("error : ", cause);
}
});
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
\ No newline at end of file
package com.kwan.shuyu.onlytest;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.Charset;
@Slf4j
public class Server {
public static void main(String[] args) {
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
log.debug(buffer.toString(Charset.defaultCharset()));
// 建议使用 ctx.alloc() 创建 ByteBuf
final ByteBuf response = ctx.alloc().buffer(); // class io.netty.buffer.PooledUnsafeDirectByteBuf
response.writeBytes("服务端:".getBytes());
response.writeBytes(buffer);
ctx.writeAndFlush(response);
}
});
}
})
.bind(7777);
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import com.kwan.shuyu.config.Config;
import com.kwan.shuyu.message.Message;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 自定义消息编解码类
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:15
*/
@Slf4j
@ChannelHandler.Sharable
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
out.writeBytes(new byte[]{1, 2, 3, 4}); // 4字节的 魔数
out.writeByte(1); // 1字节的 版本
out.writeByte(Config.getMySerializerAlgorithm().ordinal()); // 1字节的 序列化方式 0-jdk,1-json
out.writeByte(msg.getMessageType()); // 1字节的 指令类型
out.writeInt(msg.getSequenceId()); // 4字节的 请求序号 【大端】
out.writeByte(0xff); // 1字节的 对其填充,只为了非消息内容 是2的整数倍
final byte[] bytes = Config.getMySerializerAlgorithm().serializ(msg);
// 写入内容 长度
out.writeInt(bytes.length);
// 写入内容
out.writeBytes(bytes);
/**
* 加入List 方便传递给 下一个Handler
*/
outList.add(out);
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt(); // 大端4字节的 魔数
byte version = in.readByte(); // 版本
byte serializerType = in.readByte();// 0 Java 1 Json
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
final byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length); // 读取进来,下面再进行 解码
// 1. 找到反序列化算法
final MySerializer.Algorithm algorithm = MySerializer.Algorithm.values()[serializerType];
// 2. 找到消息具体类型
final Object message = algorithm.deserializer(Message.getMessageClass(messageType), bytes);
log.debug("{}", message);
/**
* 加入List 方便传递给 下一个Handler
*/
out.add(message);
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import com.google.gson.*;
import java.io.*;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
/**
* 自定义序列化和反序列化
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:12
*/
public interface MySerializer {
/**
* 序列化
*
* @param object
* @param <T>
* @return
*/
<T> byte[] serializ(T object);
/**
* 反序列化
*
* @param clazz
* @param bytes
* @param <T>
* @return
*/
<T> T deserializer(Class<T> clazz, byte[] bytes);
/**
* 多个算法
* 枚举对象.ordinal() 获取顺序int, 第一个0 第二个1 ...
*/
enum Algorithm implements MySerializer {
Java {
@Override
public <T> T deserializer(Class<T> clazz, byte[] bytes) {
try {
// 处理内容
final ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
final ObjectInputStream ois = new ObjectInputStream(bis);
// 转成 Message类型
T message = (T) ois.readObject();
return message;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化算法失败!");
}
}
@Override
public <T> byte[] serializ(T object) {
try {
// 处理内容 用对象流包装字节数组 并写入
ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 访问数组
ObjectOutputStream oos = new ObjectOutputStream(bos); // 用对象流 包装
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化算法失败!", e);
}
}
},
Json {
@Override
public <T> T deserializer(Class<T> clazz, byte[] bytes) {
// 指定 下面自定义处理类 生成 对象
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = new String(bytes, StandardCharsets.UTF_8);
return gson.fromJson(json, clazz);
}
@Override
public <T> byte[] serializ(T object) {
// 指定 下面自定义处理类 生成 对象
Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
String json = gson.toJson(object);
return json.getBytes(StandardCharsets.UTF_8);
}
}
}
/**
* 针对之前报出:不支持 Class类转json的异常 做处理
*/
class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>> {
@Override
public Class<?> deserialize(JsonElement json, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
try {
String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override
public JsonElement serialize(Class<?> src, Type type, JsonSerializationContext jsonSerializationContext) {
// JsonPrimitive 转化基本数据类型
return new JsonPrimitive(src.getName());
}
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
/**
* 自定义帧解码器
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/4/28 11:11
*/
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(
int maxFrameLength,
int lengthFieldOffset,
int lengthFieldLength,
int lengthAdjustment,
int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
\ No newline at end of file
package com.kwan.shuyu.protocol;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 抽象类 不能实例化
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/5 13:48
*/
public abstract class SequenceIdGenerator {
private static final AtomicInteger id = new AtomicInteger();
/**
* 递增 和 获取
*
* @return
*/
public static int nextId() {
return id.incrementAndGet();
}
}
\ No newline at end of file
package com.kwan.shuyu.server;
import com.kwan.shuyu.handler.RpcRequestMessageHandler;
import com.kwan.shuyu.protocol.MessageCodecSharable;
import com.kwan.shuyu.protocol.ProcotolFrameDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class RpcServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); // 【使用 asm包方法】
// rpc 请求消息处理器,待实现
RpcRequestMessageHandler RPC_REQUEST_HANDLER = new RpcRequestMessageHandler();
try {
ServerBootstrap bs = new ServerBootstrap();
bs.channel(NioServerSocketChannel.class);
bs.group(boss, worker);
bs.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder()); // 【使用 asm包方法】
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(RPC_REQUEST_HANDLER);
}
});
Channel channel = bs.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
\ No newline at end of file
package com.kwan.shuyu.server;
import com.google.gson.*;
import java.lang.reflect.Type;
public class TestJson {
public static void main(String[] args) {
final Gson gson = new GsonBuilder().registerTypeAdapter(Class.class, new ClassCodec()).create();
System.out.println(gson.toJson(String.class));
}
static class ClassCodec implements JsonSerializer<Class<?>>, JsonDeserializer<Class<?>>{
@Override
public Class<?> deserialize(JsonElement json, Type type, JsonDeserializationContext jsonDeserializationContext) throws JsonParseException {
try {
final String str = json.getAsString();
return Class.forName(str);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e);
}
}
@Override
public JsonElement serialize(Class<?> src, Type type, JsonSerializationContext jsonSerializationContext) {
// JsonPrimitive 转化基本数据类型
return new JsonPrimitive(src.getName());
}
}
}
package com.kwan.shuyu.server.service;
@SuppressWarnings("ALL")
public interface HelloService {
String sayHello(String msg);
}
package com.kwan.shuyu.server.service;
@SuppressWarnings({"ALL", "AlibabaClassMustHaveAuthor"})
public class HelloService2Impl implements HelloService{
@Override
public String sayHello(String msg) {
return "啥都不知道 : " + msg;
}
}
package com.kwan.shuyu.server.service;
@SuppressWarnings("AlibabaClassMustHaveAuthor")
public class HelloServiceImpl implements HelloService {
@Override
public String sayHello(String msg) {
/*
不做处理的话:爆出:io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds 1024: 11509 - discarded
处理地方:RpcRequestMessageHandler ------42行
*/
// int i = 1 / 0;
return "服务端ROBOT :你好, " + msg;
}
}
\ No newline at end of file
package com.kwan.shuyu.server.service;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@SuppressWarnings("ALL")
public class ServiceFactory {
public static Properties properties;
public static Map<Class<?>, Object> map = new ConcurrentHashMap<>();
static {
try (InputStream in = ServiceFactory.class.getResourceAsStream("/application.properties")) {
properties = new Properties();
properties.load(in);
Set<String> names = properties.stringPropertyNames();
for (String name : names) {
if (name.endsWith("Service")) {
Class<?> interfaceClass = Class.forName(name);
Class<?> instanceClass = Class.forName(properties.getProperty(name));
map.put(interfaceClass, instanceClass.newInstance());
}
}
} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new ExceptionInInitializerError(e);
}
}
/**
* 根据 接口类 获取 实现类
*
* @param interfaceClass
* @param <T>
* @return
*/
public static <T> T getService(Class<T> interfaceClass) {
return (T) map.get(interfaceClass);
}
}
#\u7AEF\u53E3\u53F7
server.port=6000
# \u5E8F\u5217\u5316\u65B9\u5F0F Java Json
mySerializer.algorithm=Java
com.rpc.server.service.HelloService=com.rpc.server.service.HelloServiceImpl2
\ No newline at end of file
......@@ -12,6 +12,7 @@
<module>netty-02-induction</module>
<module>netty-03-jinjie</module>
<module>netty-04-chat</module>
<module>netty-05-rpc</module>
<module>netty-99-kuangshen</module>
</modules>
<dependencies>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册