提交 63e3b5b4 编写于 作者: G guide

[refractor]rpc request protocol

上级 e5c97fd2
package github.javaguide;
import github.javaguide.annotation.RpcScan;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.netty.client.NettyClientTransport;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
/**
* @author shuang.kou
......
......@@ -2,7 +2,6 @@ import github.javaguide.HelloService;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.remoting.transport.socket.SocketRpcServer;
import github.javaguide.serviceimpl.HelloServiceImpl;
import github.javaguide.serviceimpl.HelloServiceImpl2;
/**
* @author shuang.kou
......
package github.javaguide.enumeration;
/**
* @author shuang.kou
* @createTime 2020年06月16日 20:34:00
*/
public enum RpcMessageType {
HEART_BEAT
}
package github.javaguide.enumeration;
package github.javaguide.enums;
public enum RpcConfigProperties {
public enum RpcConfigPropertiesEnum {
RPC_CONFIG_PATH("rpc.properties"),
ZK_ADDRESS("rpc.zookeeper.address");
......@@ -8,7 +8,7 @@ public enum RpcConfigProperties {
private final String propertyValue;
RpcConfigProperties(String propertyValue) {
RpcConfigPropertiesEnum(String propertyValue) {
this.propertyValue = propertyValue;
}
......
package github.javaguide.enumeration;
package github.javaguide.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
......@@ -11,7 +11,7 @@ import lombok.ToString;
@AllArgsConstructor
@Getter
@ToString
public enum RpcErrorMessage {
public enum RpcErrorMessageEnum {
CLIENT_CONNECT_SERVER_FAILURE("客户端连接服务端失败"),
SERVICE_INVOCATION_FAILURE("服务调用失败"),
SERVICE_CAN_NOT_BE_FOUND("没有找到指定的服务"),
......
package github.javaguide.enumeration;
package github.javaguide.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
......@@ -11,7 +11,7 @@ import lombok.ToString;
@AllArgsConstructor
@Getter
@ToString
public enum RpcResponseCode {
public enum RpcResponseCodeEnum {
SUCCESS(200, "The remote call is successful"),
FAIL(500, "The remote call is fail");
......
package github.javaguide.remoting.transport.netty.codec.enums;
package github.javaguide.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
......@@ -9,16 +9,15 @@ import lombok.Getter;
*/
@AllArgsConstructor
@Getter
public enum MySerializableEnum {
public enum SerializationTypeEnum {
KYRO((byte) 0x01, "kyro");
private final byte code;
private final String name;
public static String getName(byte code) {
for (MySerializableEnum c : MySerializableEnum.values()) {
for (SerializationTypeEnum c : SerializationTypeEnum.values()) {
if (c.getCode() == code) {
return c.name;
}
......
package github.javaguide.exception;
import github.javaguide.enumeration.RpcErrorMessage;
import github.javaguide.enums.RpcErrorMessageEnum;
/**
* @author shuang.kou
* @createTime 2020年05月12日 16:48:00
*/
public class RpcException extends RuntimeException {
public RpcException(RpcErrorMessage rpcErrorMessage, String detail) {
super(rpcErrorMessage.getMessage() + ":" + detail);
public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum, String detail) {
super(rpcErrorMessageEnum.getMessage() + ":" + detail);
}
public RpcException(String message, Throwable cause) {
super(message, cause);
}
public RpcException(RpcErrorMessage rpcErrorMessage) {
super(rpcErrorMessage.getMessage());
public RpcException(RpcErrorMessageEnum rpcErrorMessageEnum) {
super(rpcErrorMessageEnum.getMessage());
}
}
package github.javaguide.extension;
import java.lang.annotation.*;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Documented
@Retention(RetentionPolicy.RUNTIME)
......
......@@ -21,7 +21,7 @@ public final class SingletonFactory {
Object instance = null;
if (instance == null) {
synchronized (SingletonFactory.class) {
instance = OBJECT_MAP.get(key);
instance = OBJECT_MAP.get(key);
if (instance == null) {
try {
instance = c.getDeclaredConstructor().newInstance();
......
......@@ -12,6 +12,7 @@ import java.lang.annotation.Target;
* RPC reference annotation, autowire the service implementation class
*
* @author smile2coder
* @createTime 2020年09月16日 21:42:00
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
......
package github.javaguide.annotation;
import org.springframework.stereotype.Component;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
......
package github.javaguide.provider;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.enumeration.RpcErrorMessage;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.registry.ServiceRegistry;
......@@ -52,7 +52,7 @@ public class ServiceProviderImpl implements ServiceProvider {
public Object getService(RpcServiceProperties rpcServiceProperties) {
Object service = serviceMap.get(rpcServiceProperties.toRpcServiceName());
if (null == service) {
throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND);
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
}
return service;
}
......
package github.javaguide.proxy;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.remoting.dto.RpcMessageChecker;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.ClientTransport;
......@@ -26,6 +28,9 @@ import java.util.concurrent.CompletableFuture;
*/
@Slf4j
public class RpcClientProxy implements InvocationHandler {
private static final String INTERFACE_NAME = "interfaceName";
/**
* Used to send requests to the server.And there are two implementations: socket and netty
*/
......@@ -82,7 +87,21 @@ public class RpcClientProxy implements InvocationHandler {
if (clientTransport instanceof SocketRpcClient) {
rpcResponse = (RpcResponse<Object>) clientTransport.sendRpcRequest(rpcRequest);
}
RpcMessageChecker.check(rpcResponse, rpcRequest);
this.check(rpcResponse, rpcRequest);
return rpcResponse.getData();
}
private void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
if (rpcResponse == null) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcErrorMessageEnum.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCodeEnum.SUCCESS.getCode())) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
}
package github.javaguide.registry.zk;
import github.javaguide.enumeration.RpcErrorMessage;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.loadbalance.RandomLoadBalance;
......@@ -31,7 +31,7 @@ public class ZkServiceDiscovery implements ServiceDiscovery {
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (serviceUrlList.size() == 0) {
throw new RpcException(RpcErrorMessage.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND, rpcServiceName);
}
// load balancing
String targetServiceUrl = loadBalance.selectServiceAddress(serviceUrlList);
......
package github.javaguide.registry.zk.util;
import github.javaguide.enumeration.RpcConfigProperties;
import github.javaguide.enums.RpcConfigPropertiesEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.utils.file.PropertiesFileUtils;
import lombok.extern.slf4j.Slf4j;
......@@ -97,9 +97,9 @@ public final class CuratorUtils {
public static CuratorFramework getZkClient() {
// check if user has set zk address
Properties properties = PropertiesFileUtils.readPropertiesFile(RpcConfigProperties.RPC_CONFIG_PATH.getPropertyValue());
Properties properties = PropertiesFileUtils.readPropertiesFile(RpcConfigPropertiesEnum.RPC_CONFIG_PATH.getPropertyValue());
if (properties != null) {
defaultZookeeperAddress = properties.getProperty(RpcConfigProperties.ZK_ADDRESS.getPropertyValue());
defaultZookeeperAddress = properties.getProperty(RpcConfigPropertiesEnum.ZK_ADDRESS.getPropertyValue());
}
// if zkClient has been started, return directly
if (zkClient != null && zkClient.getState() == CuratorFrameworkState.STARTED) {
......
......@@ -7,44 +7,26 @@ import java.nio.charset.StandardCharsets;
* @author wangtao .
* @createTime on 2020/10/2
*/
public class RpcConstants {
/**
* 魔法数 检验 RpcMessage
* guide rpc
* Magic number. Verify RpcMessage
*/
public static final byte[] MAGIC_NUMBER = {(byte) 'g', (byte) 'r', (byte) 'p', (byte) 'c'};
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
//版本信息
//version information
public static final byte VERSION = 1;
public static final byte TOTAL_LENGTH = 15;
//请求
public static final byte MSGTYPE_RESQUEST = 1;
//相应
public static final byte MSGTYPE_RESPONSE = 2;
public static final byte REQUEST_TYPE = 1;
public static final byte RESPONSE_TYPE = 2;
//ping
public static final byte MSGTYPE_HEARTBEAT_REQUEST = 3;
public static final byte HEARTBEAT_REQUEST_TYPE = 3;
//pong
public static final byte MSGTYPE_HEARTBEAT_RESPONSE = 4;
public static final byte HEARTBEAT_RESPONSE_TYPE = 4;
public static final int HEAD_LENGTH = 15;
public static final String PING = "ping";
public static final String PONG = "pong";
public static final int MAX_FRAME_LENGTH = 8 * 1024 * 1024;
}
package github.javaguide.remoting.dto;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
/**
* @author wangtao
......@@ -15,17 +20,13 @@ import lombok.*;
@ToString
public class RpcMessage {
//消息类型
//rpc message type
private byte messageType;
//序列化类型
//serialization type
private byte codec;
//请求id
//request id
private int requestId;
//数据内容
//request data
private Object data;
}
package github.javaguide.remoting.dto;
import github.javaguide.enumeration.RpcErrorMessage;
import github.javaguide.enumeration.RpcResponseCode;
import github.javaguide.exception.RpcException;
import lombok.extern.slf4j.Slf4j;
/**
* Verify RpcRequest and RpcRequest
*
* @author shuang.kou
* @createTime 2020年05月26日 18:03:00
*/
@Slf4j
public final class RpcMessageChecker {
private static final String INTERFACE_NAME = "interfaceName";
private RpcMessageChecker() {
}
public static void check(RpcResponse<Object> rpcResponse, RpcRequest rpcRequest) {
if (rpcResponse == null) {
throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (!rpcRequest.getRequestId().equals(rpcResponse.getRequestId())) {
throw new RpcException(RpcErrorMessage.REQUEST_NOT_MATCH_RESPONSE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
if (rpcResponse.getCode() == null || !rpcResponse.getCode().equals(RpcResponseCode.SUCCESS.getCode())) {
throw new RpcException(RpcErrorMessage.SERVICE_INVOCATION_FAILURE, INTERFACE_NAME + ":" + rpcRequest.getInterfaceName());
}
}
}
package github.javaguide.remoting.dto;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.enumeration.RpcMessageType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
......@@ -26,7 +25,6 @@ public class RpcRequest implements Serializable {
private String methodName;
private Object[] parameters;
private Class<?>[] paramTypes;
private RpcMessageType rpcMessageType;
private String version;
private String group;
......
package github.javaguide.remoting.dto;
import github.javaguide.enumeration.RpcResponseCode;
import github.javaguide.enums.RpcResponseCodeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
......@@ -39,8 +39,8 @@ public class RpcResponse<T> implements Serializable {
public static <T> RpcResponse<T> success(T data, String requestId) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(RpcResponseCode.SUCCESS.getCode());
response.setMessage(RpcResponseCode.SUCCESS.getMessage());
response.setCode(RpcResponseCodeEnum.SUCCESS.getCode());
response.setMessage(RpcResponseCodeEnum.SUCCESS.getMessage());
response.setRequestId(requestId);
if (null != data) {
response.setData(data);
......@@ -48,10 +48,10 @@ public class RpcResponse<T> implements Serializable {
return response;
}
public static <T> RpcResponse<T> fail(RpcResponseCode rpcResponseCode) {
public static <T> RpcResponse<T> fail(RpcResponseCodeEnum rpcResponseCodeEnum) {
RpcResponse<T> response = new RpcResponse<>();
response.setCode(rpcResponseCode.getCode());
response.setMessage(rpcResponseCode.getMessage());
response.setCode(rpcResponseCodeEnum.getCode());
response.setMessage(rpcResponseCodeEnum.getMessage());
return response;
}
......
......@@ -4,7 +4,12 @@ package github.javaguide.remoting.transport.netty.client;
import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder;
import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
......
......@@ -4,8 +4,12 @@ import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
import io.netty.channel.*;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.ReferenceCountUtil;
......@@ -43,9 +47,9 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
if (msg instanceof RpcMessage) {
RpcMessage tmp = (RpcMessage) msg;
byte messageType = tmp.getMessageType();
if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
log.info("heart [{}]", tmp.getData());
} else if (messageType == RpcConstants.MSGTYPE_RESPONSE) {
} else if (messageType == RpcConstants.RESPONSE_TYPE) {
RpcResponse<Object> rpcResponse = (RpcResponse<Object>) tmp.getData();
unprocessedRequests.complete(rpcResponse);
}
......@@ -63,8 +67,8 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.MSGTYPE_HEARTBEAT_REQUEST);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcMessage.setData(RpcConstants.PING);
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
......
......@@ -8,7 +8,7 @@ import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.ClientTransport;
import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;
......@@ -49,8 +49,8 @@ public class NettyClientTransport implements ClientTransport {
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESQUEST);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
log.info("client send message: [{}]", rpcMessage);
......
package github.javaguide.remoting.transport.netty.codec;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
......@@ -15,25 +15,52 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
/**
* @author wangtao .
* custom protocol decoder
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
* | magic code |version | full length | messageType| codec| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B codec(序列化类型) 4B requestId(请求的Id)
* body(object类型数据)
* </pre>
* <p>
* {@link LengthFieldBasedFrameDecoder} is a length-based decoder , used to solve TCP unpacking and sticking problems.
* </p>
*
* @author wangtao
* @createTime on 2020/10/2
* @see <a href="https://zhuanlan.zhihu.com/p/95621344">LengthFieldBasedFrameDecoder解码器</a>
*/
@Slf4j
public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
public RpcMessageDecoder() {
// default is 8M
this(RpcConstants.MAX_FRAME_LENGTH);
// lengthFieldOffset: magic code is 4B, and version is 1B, and then full length. so value is 5
// lengthFieldLength: full length is 4B. so value is 4
// lengthAdjustment: full length include all data and read 9 bytes before, so the left length is (fullLength-9). so values is -9
// initialBytesToStrip: we will check magic code and version manually, so do not strip any bytes. so values is 0
this(RpcConstants.MAX_FRAME_LENGTH, 5, 4, -9, 0);
}
public RpcMessageDecoder(int maxFrameLength) {
/*
int maxFrameLength,
int lengthFieldOffset, magic code is 4B, and version is 1B, and then FullLength. so value is 5
int lengthFieldLength, FullLength is int(4B). so values is 4
int lengthAdjustment, FullLength include all data and read 9 bytes before, so the left length is (FullLength-9). so values is -9
int initialBytesToStrip we will check magic code and version self, so do not strip any bytes. so values is 0
*/
super(maxFrameLength, 5, 4, -9, 0);
/**
* @param maxFrameLength Maximum frame length. It decide the maximum length of data that can be received.
* If it exceeds, the data will be discarded.
* @param lengthFieldOffset Length field offset. The length field is the one that skips the specified length of byte.
* @param lengthFieldLength The number of bytes in the length field.
* @param lengthAdjustment The compensation value to add to the value of the length field
* @param initialBytesToStrip Number of bytes skipped.
* If you need to receive all of the header+body data, this value is 0
* if you only want to receive the body data, then you need to skip the number of bytes consumed by the header.
*/
public RpcMessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
@Override
......@@ -57,10 +84,9 @@ public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
}
private Object decodeFrame(ByteBuf in)
throws Exception {
// 读取前4个magic比对一下
private Object decodeFrame(ByteBuf in) {
// note: must read ByteBuf in order
// read the first 4 bit, which is the magic number, and compare
int len = RpcConstants.MAGIC_NUMBER.length;
byte[] tmp = new byte[len];
in.readBytes(tmp);
......@@ -69,33 +95,33 @@ public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
throw new IllegalArgumentException("Unknown magic code: " + Arrays.toString(tmp));
}
}
// read the version and compare
byte version = in.readByte();
if (version != RpcConstants.VERSION) {
throw new RuntimeException("version isn't compatible" + version);
}
int fullLength = in.readInt();
//消息类型
// build RpcMessage object
byte messageType = in.readByte();
//读取序列化类型
byte codecType = in.readByte();
int requestId = in.readInt();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setMessageType(messageType);
rpcMessage.setRequestId(requestId);
rpcMessage.setCodec(codecType);
if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_REQUEST) {
RpcMessage rpcMessage = RpcMessage.builder()
.codec(codecType)
.requestId(requestId)
.messageType(messageType).build();
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
rpcMessage.setData(RpcConstants.PING);
} else if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
} else if (messageType == RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
rpcMessage.setData(RpcConstants.PONG);
} else {
int bodyLength = fullLength - RpcConstants.HEAD_LENGTH;
if (bodyLength > 0) {
byte[] bs = new byte[bodyLength];
in.readBytes(bs);
String codecName = MySerializableEnum.getName(rpcMessage.getCodec());
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
if (messageType == RpcConstants.MSGTYPE_RESQUEST) {
if (messageType == RpcConstants.REQUEST_TYPE) {
RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
rpcMessage.setData(tmpValue);
} else {
......
package github.javaguide.remoting.transport.netty.codec;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
......@@ -15,58 +15,51 @@ import java.util.concurrent.atomic.AtomicInteger;
/**
* <p>
* custom protocol decoder
* <p>
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
* | magic code |version | full length | messageType| codec| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
* | |
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B codec(序列化类型) 4B requestId(请求的Id)
* body(object类型数据)
* </pre>
*
* @author WangTao
* @createTime on 2020/10/2
*
* 自定义协议解码器
*
* * <pre>
* * 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
* * +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
* * | magic code |version | Full length | messageType| codec| RequestId |
* * +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* * | |
* * | body |
* * | |
* * | ... ... |
* * +-------------------------------------------------------------------------------------------------------+
*
* 自定义编码器
* 4B magic code 魔法数 1B version 版本 4B full length 消息长度 1B messageType 消息类型
* 1B codec 序列化 4B requestId 请求的Id
* body object类型数据
*
* @see <a href="https://zhuanlan.zhihu.com/p/95621344">LengthFieldBasedFrameDecoder解码器</a>
*/
@Slf4j
public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
private static final AtomicInteger ATOMIC_INTEGER = new AtomicInteger(0);
@Override
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)
throws Exception {
protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) {
try {
int fullLength = RpcConstants.HEAD_LENGTH;
byte messageType = rpcMessage.getMessageType();
//写入magic数字
out.writeBytes(RpcConstants.MAGIC_NUMBER);
out.writeByte(RpcConstants.VERSION);
// 留出位置写入数据包的长度
// leave a place to write the value of full length
out.writerIndex(out.writerIndex() + 4);
//设置消息类型
out.writeByte(rpcMessage.getMessageType());
//设置序列化
byte messageType = rpcMessage.getMessageType();
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeInt(ATOMIC_INTEGER.getAndDecrement());
// build full length
byte[] bodyBytes = null;
//不是心跳
if (messageType != RpcConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
//对象序列化
String codecName = MySerializableEnum.getName(rpcMessage.getCodec());
int fullLength = RpcConstants.HEAD_LENGTH;
// if messageType is not heartbeat message,fullLength = head length + body length
if (messageType != RpcConstants.HEARTBEAT_REQUEST_TYPE
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
bodyBytes = serializer.serialize(rpcMessage.getData());
......@@ -76,21 +69,16 @@ public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}
int writeIndex = out.writerIndex();
out.writerIndex(writeIndex - fullLength + RpcConstants.MAGIC_NUMBER.length + 1);
//写入长度
out.writeInt(fullLength);
//重置
out.writerIndex(writeIndex);
} catch (Exception e) {
log.error("Encode request error!", e);
}
}
}
......@@ -8,7 +8,11 @@ import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder;
import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
......
package github.javaguide.remoting.transport.netty.server;
import github.javaguide.enumeration.RpcResponseCode;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.handler.RpcRequestHandler;
import github.javaguide.remoting.transport.netty.codec.enums.MySerializableEnum;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
......@@ -41,10 +41,10 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (msg instanceof RpcMessage) {
log.info("server receive msg: [{}] ", msg);
byte messageType = ((RpcMessage) msg).getMessageType();
if (messageType == RpcConstants.MSGTYPE_HEARTBEAT_REQUEST) {
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.MSGTYPE_HEARTBEAT_RESPONSE);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
......@@ -55,15 +55,15 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESPONSE);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCode.FAIL);
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(MySerializableEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.MSGTYPE_RESPONSE);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.error("not writable now, message dropped");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册