提交 801287f4 编写于 作者: G guide

[refractor]duplicate code

上级 914bbad9
......@@ -11,7 +11,8 @@ import lombok.Getter;
@Getter
public enum SerializationTypeEnum {
KYRO((byte) 0x01, "kyro");
KYRO((byte) 0x01, "kyro"),
PROTOSTUFF((byte) 0x02, "protostuff");;
private final byte code;
private final String name;
......
......@@ -69,7 +69,7 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
log.info("write idle happen [{}]", ctx.channel().remoteAddress());
Channel channel = channelProvider.get((InetSocketAddress) ctx.channel().remoteAddress());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_REQUEST_TYPE);
rpcMessage.setData(RpcConstants.PING);
......
......@@ -50,7 +50,7 @@ public class NettyClientTransport implements ClientTransport {
unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setData(rpcRequest);
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.REQUEST_TYPE);
channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
......
......@@ -128,6 +128,7 @@ public class RpcMessageDecoder extends LengthFieldBasedFrameDecoder {
bs = compress.decompress(bs);
// deserialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
if (messageType == RpcConstants.REQUEST_TYPE) {
......
......@@ -64,6 +64,7 @@ public class RpcMessageEncoder extends MessageToByteEncoder<RpcMessage> {
&& messageType != RpcConstants.HEARTBEAT_RESPONSE_TYPE) {
// serialize the object
String codecName = SerializationTypeEnum.getName(rpcMessage.getCodec());
log.info("codec name: [{}] ", codecName);
Serializer serializer = ExtensionLoader.getExtensionLoader(Serializer.class)
.getExtension(codecName);
bodyBytes = serializer.serialize(rpcMessage.getData());
......
......@@ -2,13 +2,13 @@ package github.javaguide.remoting.transport.netty.server;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.handler.RpcRequestHandler;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
......@@ -42,38 +42,28 @@ public class NettyServerHandler extends ChannelInboundHandlerAdapter {
if (msg instanceof RpcMessage) {
log.info("server receive msg: [{}] ", msg);
byte messageType = ((RpcMessage) msg).getMessageType();
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.PROTOSTUFF.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
if (messageType == RpcConstants.HEARTBEAT_REQUEST_TYPE) {
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.HEARTBEAT_RESPONSE_TYPE);
rpcMessage.setData(RpcConstants.PONG);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
RpcRequest rpcRequest = (RpcRequest) ((RpcMessage) msg).getData();
// Execute the target method (the method the client needs to execute) and return the method result
Object result = rpcRequestHandler.handle(rpcRequest);
log.info(String.format("server get result: %s", result.toString()));
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
if (ctx.channel().isActive() && ctx.channel().isWritable()) {
RpcResponse<Object> rpcResponse = RpcResponse.success(result, rpcRequest.getRequestId());
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
RpcResponse<Object> rpcResponse = RpcResponse.fail(RpcResponseCodeEnum.FAIL);
RpcMessage rpcMessage = new RpcMessage();
rpcMessage.setCodec(SerializationTypeEnum.KYRO.getCode());
rpcMessage.setCompress(CompressTypeEnum.GZIP.getCode());
rpcMessage.setMessageType(RpcConstants.RESPONSE_TYPE);
rpcMessage.setData(rpcResponse);
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
log.error("not writable now, message dropped");
}
}
ctx.writeAndFlush(rpcMessage).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} finally {
//Ensure that ByteBuf is released, otherwise there may be memory leaks
......
package github.javaguide.serialize.protostuff;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.serialize.Serializer;
import io.protostuff.LinkedBuffer;
import io.protostuff.ProtostuffIOUtil;
......@@ -16,18 +15,17 @@ public class ProtostuffSerializer implements Serializer {
/**
* Avoid re applying buffer space every time serialization
*/
private static LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
private static final LinkedBuffer BUFFER = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
@SuppressWarnings("unchecked")
@Override
public byte[] serialize(Object obj) {
Class<?> clazz = obj.getClass();
Schema schema = RuntimeSchema.getSchema(clazz);
byte[] bytes;
try {
bytes = ProtostuffIOUtil.toByteArray(obj, schema, buffer);
bytes = ProtostuffIOUtil.toByteArray(obj, schema, BUFFER);
} finally {
buffer.clear();
BUFFER.clear();
}
return bytes;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册