未验证 提交 0bc99898 编写于 作者: S SnailClimb 提交者: GitHub

Merge pull request #21 from ChineseTony/master

update protocol and add compress
package github.javaguide.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* @author wangtao .
* @createTime on 2020/10/2
*/
@AllArgsConstructor
@Getter
public enum CompressTypeEnum {
GZIP((byte) 0x01, "gzip");
private final byte code;
private final String name;
public static String getName(byte code) {
for (CompressTypeEnum c : CompressTypeEnum.values()) {
if (c.getCode() == code) {
return c.name;
}
}
return null;
}
}
package github.javaguide.compress;
import github.javaguide.extension.SPI;
/**
* @author wangtao .
* @createTime on 2020/10/3
*/
@SPI
public interface Compress {
byte[] compress(byte[] bytes);
byte[] decompress(byte[] bytes);
}
package github.javaguide.compress.gzip;
import github.javaguide.compress.Compress;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
/**
* @author wangtao .
* @createTime on 2020/10/3
*/
public class GzipCompress implements Compress {
private static final int BUFFER_SIZE = 1024 * 4;
@Override
public byte[] compress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(out)) {
gzip.write(bytes);
gzip.flush();
gzip.finish();
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("gzip compress error", e);
}
}
@Override
public byte[] decompress(byte[] bytes) {
if (bytes == null) {
throw new NullPointerException("bytes is null");
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
try (GZIPInputStream gunzip = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
byte[] buffer = new byte[BUFFER_SIZE];
int n;
while ((n = gunzip.read(buffer)) > -1) {
out.write(buffer, 0, n);
}
return out.toByteArray();
} catch (IOException e) {
throw new RuntimeException("gzip decompress error", e);
}
}
}
......@@ -17,14 +17,14 @@ public class RpcConstants {
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
//version information
public static final byte VERSION = 1;
public static final byte TOTAL_LENGTH = 15;
public static final byte TOTAL_LENGTH = 16;
public static final byte REQUEST_TYPE = 1;
public static final byte RESPONSE_TYPE = 2;
//ping
public static final byte HEARTBEAT_REQUEST_TYPE = 3;
//pong
public static final byte HEARTBEAT_RESPONSE_TYPE = 4;
public static final int HEAD_LENGTH = 15;
public static final int HEAD_LENGTH = 16;
public static final String PING = "ping";
public static final String PONG = "pong";
public static final int MAX_FRAME_LENGTH = 8 * 1024 * 1024;
......
......@@ -24,6 +24,8 @@ public class RpcMessage {
private byte messageType;
//serialization type
private byte codec;
//compress type
private byte compress;
//request id
private int requestId;
//request data
......
......@@ -52,7 +52,7 @@ public final class NettyClient {
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
ch.pipeline().addLast(new NettyClientHandler());
p.addLast(new NettyClientHandler());
}
});
}
......
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.compress.Compress;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
......@@ -68,6 +70,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcMessage.setData(RpcConstants.PING);
channel.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.registry.ServiceDiscovery;
......@@ -50,6 +51,7 @@ public class NettyClientTransport implements ClientTransport {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
......
package github.javaguide.remoting.transport.netty.codec;
import github.javaguide.compress.Compress;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.remoting.constants.RpcConstants;
......@@ -104,6 +106,7 @@ public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
// build RpcMessage object
byte messageType = in.readByte();
byte codecType = in.readByte();
byte compressType = in.readByte();
int requestId = in.readInt();
RpcMessage rpcMessage = RpcMessage.builder()
.codec(codecType)
......@@ -121,6 +124,10 @@ public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
String compressName = CompressTypeEnum.getName(compressType);
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);
bs = compress.decompress(bs);
if (messageType == RpcConstants.REQUEST_TYPE) {
RpcRequest tmpValue = serializer.deserialize(bs, RpcRequest.class);
rpcMessage.setData(tmpValue);
......
package github.javaguide.remoting.transport.netty.codec;
import github.javaguide.compress.Compress;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.remoting.constants.RpcConstants;
......@@ -19,9 +21,9 @@ import java.util.concurrent.atomic.AtomicInteger;
* custom protocol decoder
* <p>
* <pre>
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+-----------+-----+-----+-----+
* | magic code |version | full length | messageType| codec| RequestId |
* 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
* +-----+-----+-----+-----+--------+----+----+----+------+-----------+-------+----- --+-----+-----+-------+
* | magic code |version | full length | messageType| codec|compress| RequestId |
* +-----------------------+--------+---------------------+-----------+-----------+-----------+------------+
* | |
* | body |
......@@ -29,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
* | ... ... |
* +-------------------------------------------------------------------------------------------------------+
* 4B magic code(魔法数) 1B version(版本) 4B full length(消息长度) 1B messageType(消息类型)
* 1B codec(序列化类型) 4B requestId(请求的Id)
* 1B compress(压缩类型) 1B codec(序列化类型) 4B requestId(请求的Id)
* body(object类型数据)
* </pre>
*
......@@ -52,7 +54,8 @@ public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
byte messageType = rpcMessage.getMessageType();
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeInt(ATOMIC_INTEGER.getAndDecrement());
out.writeByte(CompressTypeEnum.GZIP.getCode());
out.writeInt(ATOMIC_INTEGER.getAndIncrement());
// build full length
byte[] bodyBytes = null;
int fullLength = RpcConstants.HEAD_LENGTH;
......@@ -63,6 +66,10 @@ public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
bodyBytes = serializer.serialize(rpcMessage.getData());
String compressName = CompressTypeEnum.getName(rpcMessage.getCompress());
Compress compress = ExtensionLoader.getExtensionLoader(Compress.class)
.getExtension(compressName);
bodyBytes = compress.compress(bodyBytes);
fullLength += bodyBytes.length;
}
......
package github.javaguide.remoting.transport.netty.server;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
......@@ -44,6 +45,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......@@ -56,6 +58,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......@@ -63,6 +66,7 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册