提交 f256704b 编写于 作者: S shuang.kou

[v2.0]use netty transport data

上级 0ccffe60
...@@ -8,10 +8,10 @@ ...@@ -8,10 +8,10 @@
<sourceTestOutputDir name="target/generated-test-sources/test-annotations" /> <sourceTestOutputDir name="target/generated-test-sources/test-annotations" />
<outputRelativeToContentRoot value="true" /> <outputRelativeToContentRoot value="true" />
<module name="rpc-framework-core" /> <module name="rpc-framework-core" />
<module name="hello-service-api" />
<module name="example-server" /> <module name="example-server" />
<module name="rpc-framework-simple" /> <module name="rpc-framework-simple" />
<module name="example-client" /> <module name="example-client" />
<module name="hello-service-api" />
<module name="rpc-framework-common" /> <module name="rpc-framework-common" />
</profile> </profile>
</annotationProcessing> </annotationProcessing>
......
a
java:S1128"DRemove this unused import 'github.javaguide.exception.RpcException'.(8ܠ.
o
java:S1128"MRemove this unused import 'github.javaguide.enumeration.RpcErrorMessageEnum'.(Һ8ܠ.
\ No newline at end of file
U java:S106 "9Replace this use of System.out or System.err by a logger.(߫ź8. U java:S106"9Replace this use of System.out or System.err by a logger.(߫ź8.
\ No newline at end of file \ No newline at end of file
j
java:S1128 "HRemove this unused import 'java.lang.reflect.InvocationTargetException'.(辆8ܠ.
f
java:S1128"IRemove this unused import 'github.javaguide.enumeration.RpcResponseCode'.(ޕ8ܠ.
W
java:S1128 "5Remove this unused import 'java.lang.reflect.Method'.(͓8ܠ.
\ No newline at end of file
T
java:S1135"2Complete the task associated to this TODO comment.(ö¤¦Âúÿÿÿÿ8ªöîÝ .
O
java:S11352"2Complete the task associated to this TODO comment.(É÷§‰8ªöîÝ .
›
java:S1130F"yRemove the declaration of thrown exception 'java.lang.ClassNotFoundException', as it cannot be thrown from method's body.(Íÿæ·üÿÿÿÿ8ÆúÝÞ .
\ No newline at end of file
...@@ -25,12 +25,6 @@ w ...@@ -25,12 +25,6 @@ w
Grpc-framework-common/src/main/java/github/javaguide/dto/RpcRequest.java,7/0/705e8b4dff28a291cae5396865e0f0fa96cda002 Grpc-framework-common/src/main/java/github/javaguide/dto/RpcRequest.java,7/0/705e8b4dff28a291cae5396865e0f0fa96cda002
L L
rpc-framework-common/pom.xml,f/f/ffa4211dc42baf16a74c0b380b40ed4cbd9f6e02 rpc-framework-common/pom.xml,f/f/ffa4211dc42baf16a74c0b380b40ed4cbd9f6e02
r
Brpc-framework-simple/src/main/java/github/javaguide/RpcClient.java,0/7/07643339d701578f80c084a95e22ac5651f4c58b
w
Grpc-framework-simple/src/main/java/github/javaguide/RpcClientProxy.java,6/4/64dea2f6fe697035b024637b900872293e816f7d
r
Brpc-framework-simple/src/main/java/github/javaguide/RpcServer.java,0/8/086d7cc1989088b68798f225e07ca046fcfed5b2
 
Oexample-server/src/main/java/github/javaguide/RpcFrameworkSimpleServerMain.java,f/2/f27c58e20df8a3c709a42909516fe0201b357eaa Oexample-server/src/main/java/github/javaguide/RpcFrameworkSimpleServerMain.java,f/2/f27c58e20df8a3c709a42909516fe0201b357eaa
 
...@@ -45,9 +39,22 @@ Dexample-server/src/main/java/github/javaguide/HelloServiceImpl2.java,c/d/cda3f ...@@ -45,9 +39,22 @@ Dexample-server/src/main/java/github/javaguide/HelloServiceImpl2.java,c/d/cda3f
Trpc-framework-common/src/main/java/github/javaguide/enumeration/RpcResponseCode.java,5/f/5f3d4a2735af7c561cf521c9e40f174434677620 Trpc-framework-common/src/main/java/github/javaguide/enumeration/RpcResponseCode.java,5/f/5f3d4a2735af7c561cf521c9e40f174434677620
 
Xrpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessageEnum.java,8/3/83d611de4efdb638903513fb6af99646e707a994 Xrpc-framework-common/src/main/java/github/javaguide/enumeration/RpcErrorMessageEnum.java,8/3/83d611de4efdb638903513fb6af99646e707a994
9
README.md,8/e/8ec9a00bfd09b3190ac6b22251dbb1aa95a0579d

Qrpc-framework-simple/src/main/java/github/javaguide/registry/ServiceRegistry.java,4/e/4eee7bde8d258a3ad13d5aad4b1fd0f47f159c22

brpc-framework-simple/src/main/java/github/javaguide/remoting/socket/RpcRequestHandlerRunnable.java,3/8/38c46e918cbf2fd183a4d20b5269ff145a5da008

Xrpc-framework-simple/src/main/java/github/javaguide/registry/DefaultServiceRegistry.java,8/a/8a8f0ad1908e89647431f04ae22e059fd159d0c1
 
Rrpc-framework-simple/src/main/java/github/javaguide/RpcRequestHandlerRunnable.java,a/5/a5cef486f97a5ac690fd68253ccb05a272eed822 Rrpc-framework-simple/src/main/java/github/javaguide/remoting/socket/RpcClient.java,8/c/8c93c859db3a1cebbda725b6225a89337732c108
x 
Hrpc-framework-simple/src/main/java/github/javaguide/ServiceRegistry.java,d/0/d02ab7f8362272316c048349fbcee5439d04001e Wrpc-framework-simple/src/main/java/github/javaguide/remoting/socket/RpcClientProxy.java,3/5/350655342260f50b6ee7ce69dcb361bad3956984
z 
Jrpc-framework-simple/src/main/java/github/javaguide/RpcRequestHandler.java,e/1/e1bb46d5a6fb0c76bfed0787c1fc324c20999044 Rrpc-framework-simple/src/main/java/github/javaguide/remoting/socket/RpcServer.java,c/f/cf4f29284214a49bc9f1eeb91eff2d0fbb46e184
\ No newline at end of file 
Srpc-framework-simple/src/main/java/github/javaguide/remoting/RpcRequestHandler.java,e/b/eb77dbc7968406e86bad5a749c9941052f4f3ccc
:
.gitignore,a/5/a5cc2925ca8258af241be7e5b0381edf30266302
\ No newline at end of file
package github.javaguide;
import github.javaguide.transport.RpcClient;
import github.javaguide.transport.RpcClientProxy;
import github.javaguide.transport.netty.NettyRpcClient;
import github.javaguide.transport.socket.SocketRpcClient;
/**
* @author shuang.kou
* @createTime 2020年05月10日 07:25:00
*/
public class NettyClientMain {
public static void main(String[] args) {
RpcClient rpcClient=new NettyRpcClient("127.0.0.1", 9999);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
}
}
package github.javaguide; package github.javaguide;
import github.javaguide.remoting.socket.RpcClientProxy; import github.javaguide.transport.RpcClient;
import github.javaguide.transport.RpcClientProxy;
import github.javaguide.transport.netty.NettyRpcClient;
import github.javaguide.transport.socket.SocketRpcClient;
/** /**
* @author shuang.kou * @author shuang.kou
...@@ -8,9 +11,13 @@ import github.javaguide.remoting.socket.RpcClientProxy; ...@@ -8,9 +11,13 @@ import github.javaguide.remoting.socket.RpcClientProxy;
*/ */
public class RpcFrameworkSimpleClientMain { public class RpcFrameworkSimpleClientMain {
public static void main(String[] args) { public static void main(String[] args) {
RpcClientProxy rpcClientProxy = new RpcClientProxy("127.0.0.1", 9999); RpcClient rpcClient=new SocketRpcClient("127.0.0.1", 9999);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class); HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222")); String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello); System.out.println(hello);
RpcClient rpcClient2=new NettyRpcClient("127.0.0.1", 9999);
} }
} }
package github.javaguide;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.transport.netty.NettyRpcServer;
import github.javaguide.transport.socket.SocketRpcServer;
/**
* @author shuang.kou
* @createTime 2020年05月10日 07:25:00
*/
public class NettyServerMain {
public static void main(String[] args) {
HelloServiceImpl helloService = new HelloServiceImpl();
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry();
// 手动注册
defaultServiceRegistry.register(helloService);
NettyRpcServer socketRpcServer = new NettyRpcServer(9999);
socketRpcServer.run();
}
}
package github.javaguide; package github.javaguide;
import github.javaguide.registry.DefaultServiceRegistry; import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.remoting.socket.RpcServer; import github.javaguide.transport.socket.SocketRpcServer;
/** /**
* @author shuang.kou * @author shuang.kou
...@@ -13,7 +13,7 @@ public class RpcFrameworkSimpleServerMain { ...@@ -13,7 +13,7 @@ public class RpcFrameworkSimpleServerMain {
DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry(); DefaultServiceRegistry defaultServiceRegistry = new DefaultServiceRegistry();
// 手动注册 // 手动注册
defaultServiceRegistry.register(helloService); defaultServiceRegistry.register(helloService);
RpcServer rpcServer = new RpcServer(defaultServiceRegistry); SocketRpcServer socketRpcServer = new SocketRpcServer();
rpcServer.start(9999); socketRpcServer.start(9999);
} }
} }
package github.javaguide; package github.javaguide;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import java.io.Serializable; import java.io.Serializable;
/** /**
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 07:04:00 * @createTime 2020年05月10日 07:04:00
*/ */
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class Hello implements Serializable { public class Hello implements Serializable {
private String message; private String message;
private String description; private String description;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public Hello(String message, String description) {
this.message = message;
this.description = description;
}
} }
...@@ -15,6 +15,8 @@ ...@@ -15,6 +15,8 @@
<!--logging--> <!--logging-->
<log4j.version>2.9.0</log4j.version> <log4j.version>2.9.0</log4j.version>
<slf4j.version>1.7.25</slf4j.version> <slf4j.version>1.7.25</slf4j.version>
<netty.version>4.1.42.Final</netty.version>
<kryo.version>4.0.2</kryo.version>
</properties> </properties>
<modules> <modules>
<module>rpc-framework-core</module> <module>rpc-framework-core</module>
......
package github.javaguide.dto; package github.javaguide.dto;
import lombok.AllArgsConstructor;
import lombok.Builder; import lombok.Builder;
import lombok.Data; import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable; import java.io.Serializable;
...@@ -9,8 +13,11 @@ import java.io.Serializable; ...@@ -9,8 +13,11 @@ import java.io.Serializable;
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 08:24:00 * @createTime 2020年05月10日 08:24:00
*/ */
@Data @AllArgsConstructor
@NoArgsConstructor
@Getter
@Builder @Builder
@ToString
public class RpcRequest implements Serializable { public class RpcRequest implements Serializable {
private static final long serialVersionUID = 1905122041950251207L; private static final long serialVersionUID = 1905122041950251207L;
......
package github.javaguide.dto; package github.javaguide.dto;
import github.javaguide.enumeration.RpcResponseCode; import github.javaguide.enumeration.RpcResponseCode;
import lombok.Data; import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter; import lombok.Setter;
import lombok.ToString;
import java.io.Serializable; import java.io.Serializable;
...@@ -11,7 +14,12 @@ import java.io.Serializable; ...@@ -11,7 +14,12 @@ import java.io.Serializable;
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月12日 16:15:00 * @createTime 2020年05月12日 16:15:00
*/ */
@Data @AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
@Builder
@ToString
public class RpcResponse<T> implements Serializable { public class RpcResponse<T> implements Serializable {
private static final long serialVersionUID = 715745410605631233L; private static final long serialVersionUID = 715745410605631233L;
......
package github.javaguide.exception;
/**
* @author shuang.kou
* @createTime 2020年05月13日 19:54:00
*/
public class SerializeException extends RuntimeException {
public SerializeException(String message) {
super(message);
}
}
...@@ -16,5 +16,15 @@ ...@@ -16,5 +16,15 @@
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>${kryo.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -5,7 +5,6 @@ import github.javaguide.exception.RpcException; ...@@ -5,7 +5,6 @@ import github.javaguide.exception.RpcException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
...@@ -22,8 +21,8 @@ public class DefaultServiceRegistry implements ServiceRegistry { ...@@ -22,8 +21,8 @@ public class DefaultServiceRegistry implements ServiceRegistry {
* key:service/interface name * key:service/interface name
* value:service * value:service
*/ */
private final Map<String, Object> serviceMap = new ConcurrentHashMap<>(); private static final Map<String, Object> serviceMap = new ConcurrentHashMap<>();
private final Set<String> registeredService = ConcurrentHashMap.newKeySet(); private static final Set<String> registeredService = ConcurrentHashMap.newKeySet();
/** /**
* TODO 修改为扫描注解注册 * TODO 修改为扫描注解注册
......
package github.javaguide.serialize;
/**
* @author shuang.kou
* @createTime 2020年05月13日 19:29:00
*/
public interface Serializer {
/**
* 序列化
*
* @param obj 要序列化的对象
* @return 字节数组
*/
byte[] serialize(Object obj);
/**
* 反序列化
*
* @param bytes 序列化后的字节数组
* @param clazz 类
* @param <T>
* @return 反序列化的对象
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
package github.javaguide.serialize.kyro;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.exception.SerializeException;
import github.javaguide.serialize.Serializer;
import github.javaguide.transport.netty.NettyClientHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
/**
* @author shuang.kou
* @createTime 2020年05月13日 19:29:00
*/
public class KryoSerializer implements Serializer {
private static final Logger logger = LoggerFactory.getLogger(KryoSerializer.class);
/**
* 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
* 所以,使用 ThreadLocal 存放 Kryo 对象
*/
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
return kryo;
});
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
logger.error("occur exception when serialize:", e);
throw new SerializeException("序列化失败");
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
logger.error("occur exception when deserialize:", e);
throw new SerializeException("反序列化失败");
}
}
}
package github.javaguide.transport;
import github.javaguide.dto.RpcRequest;
/**
* @author shuang.kou
* @createTime 2020年05月25日 17:02:00
*/
public interface RpcClient {
Object sendRpcRequest(RpcRequest rpcRequest);
}
package github.javaguide.remoting.socket; package github.javaguide.transport;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.transport.socket.SocketRpcClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -14,12 +15,10 @@ import java.lang.reflect.Proxy; ...@@ -14,12 +15,10 @@ import java.lang.reflect.Proxy;
*/ */
public class RpcClientProxy implements InvocationHandler { public class RpcClientProxy implements InvocationHandler {
private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class); private static final Logger logger = LoggerFactory.getLogger(RpcClientProxy.class);
private String host; private RpcClient rpcClient;
private int port;
public RpcClientProxy(String host, int port) { public RpcClientProxy(RpcClient rpcClient) {
this.host = host; this.rpcClient = rpcClient;
this.port = port;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -35,7 +34,6 @@ public class RpcClientProxy implements InvocationHandler { ...@@ -35,7 +34,6 @@ public class RpcClientProxy implements InvocationHandler {
.interfaceName(method.getDeclaringClass().getName()) .interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes()) .paramTypes(method.getParameterTypes())
.build(); .build();
RpcClient rpcClient = new RpcClient(); return rpcClient.sendRpcRequest(rpcRequest);
return rpcClient.sendRpcRequest(rpcRequest, host, port);
} }
} }
package github.javaguide.remoting; package github.javaguide.transport;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
......
package github.javaguide.transport.netty;
import github.javaguide.dto.RpcResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author shuang.kou
* @createTime 2020年05月13日 20:50:00
*/
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcResponse rpcResponse = (RpcResponse) msg;
logger.info(String.format("client receive msg: %s", rpcResponse));
// 声明一个 AttributeKey 对象
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
// 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源
// AttributeMap的key是AttributeKey,value是Attribute
ctx.channel().attr(key).set(rpcResponse);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("client catch exception");
cause.printStackTrace();
ctx.close();
}
}
package github.javaguide.transport.netty;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.AllArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author shuang.kou
* @createTime 2020年05月13日 19:42:00
*/
@AllArgsConstructor
public class NettyKryoDecoder extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(NettyKryoDecoder.class);
private Serializer serializer;
private Class<?> genericClass;
/**
* Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
*/
private static final int BODY_LENGTH = 4;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
//1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
if (byteBuf.readableBytes() >= BODY_LENGTH) {
//2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
byteBuf.markReaderIndex();
//3.读取消息的长度
//注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
int dataLength = byteBuf.readInt();
//4.遇到不合理的情况直接 return
if (dataLength < 0 || byteBuf.readableBytes() < 0) {
return;
}
//5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
if (byteBuf.readableBytes() < dataLength) {
byteBuf.resetReaderIndex();
return;
}
// 6.走到这里说明没什么问题了,可以序列化了
byte[] body = new byte[dataLength];
byteBuf.readBytes(body);
// 将bytes数组转换为我们需要的对象
Object obj = serializer.deserialize(body, genericClass);
list.add(obj);
}
}
}
package github.javaguide.transport.netty;
import github.javaguide.serialize.Serializer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import lombok.AllArgsConstructor;
/**
* @author shuang.kou
* @createTime 2020年05月13日 19:43:00
*/
@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
private Serializer serializer;
private Class<?> genericClass;
/**
* 将对象转换为字节码然后写入到 ByteBuf 对象中
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) {
if (genericClass.isInstance(o)) {
// 1. 将对象转换为byte
byte[] body = serializer.serialize(o);
// 2. 读取消息的长度
int dataLength = body.length;
// 3.写入消息对应的字节数组长度,writerIndex 加 4
byteBuf.writeInt(dataLength);
//4.将字节数组写入 ByteBuf 对象中
byteBuf.writeBytes(body);
}
}
}
package github.javaguide.transport.netty;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.serialize.kyro.KryoSerializer;
import github.javaguide.transport.RpcClient;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author shuang.kou
* @createTime 2020年05月25日 16:43:00
*/
public class NettyRpcClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(NettyRpcClient.class);
private String host;
private int port;
private static final Bootstrap b;
public NettyRpcClient(String host, int port) {
this.host = host;
this.port = port;
}
// 初始化相关资源比如 EventLoopGroup、Bootstrap
static {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
b = new Bootstrap();
KryoSerializer kryoSerializer = new KryoSerializer();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*自定义序列化编解码器*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
// ByteBuf -> RpcRequest
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyClientHandler());
}
});
}
/**
* 发送消息到服务端
*
* @param rpcRequest 消息体
* @return 服务端返回的数据
*/
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
try {
ChannelFuture f = b.connect(host, port).sync();
logger.info("client connect {}", host + ":" + port);
Channel futureChannel = f.channel();
if (futureChannel != null) {
futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
if (future.isSuccess()) {
logger.info(String.format("client send message: %s", rpcRequest.toString()));
} else {
logger.error("Send failed:", future.cause());
}
});
futureChannel.closeFuture().sync();
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
RpcResponse rpcResponse = futureChannel.attr(key).get();
return rpcResponse.getData();
}
} catch (InterruptedException e) {
logger.error("occur exception when connect server:", e);
}
return null;
}
}
package github.javaguide.transport.netty;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.serialize.kyro.KryoSerializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author shuang.kou
* @createTime 2020年05月25日 16:42:00
*/
public class NettyRpcServer {
private static final Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
private final int port;
private KryoSerializer kryoSerializer;
public NettyRpcServer(int port) {
this.port = port;
kryoSerializer = new KryoSerializer();
}
public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyServerHandler());
}
})
// 设置tcp缓冲区
.childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("occur exception when start server:", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
package github.javaguide.transport.netty;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.transport.RpcRequestHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author shuang.kou
* @createTime 2020年05月13日 20:44:00
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static RpcRequestHandler rpcRequestHandler;
private static ServiceRegistry serviceRegistry;
static {
rpcRequestHandler=new RpcRequestHandler();
serviceRegistry = new DefaultServiceRegistry();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcRequest rpcRequest = (RpcRequest) msg;
logger.info(String.format("server receive msg: %s", rpcRequest));
String interfaceName = rpcRequest.getInterfaceName();
Object service = serviceRegistry.getService(interfaceName);
Object result = rpcRequestHandler.handle(rpcRequest, service);
logger.info(String.format("server get result: %s", result.toString()));
ChannelFuture f = ctx.writeAndFlush(RpcResponse.success(result));
f.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("server catch exception");
cause.printStackTrace();
ctx.close();
}
}
package github.javaguide.remoting.socket; package github.javaguide.transport.socket;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.enumeration.RpcErrorMessageEnum; import github.javaguide.enumeration.RpcErrorMessageEnum;
import github.javaguide.enumeration.RpcResponseCode; import github.javaguide.enumeration.RpcResponseCode;
import github.javaguide.exception.RpcException; import github.javaguide.exception.RpcException;
import github.javaguide.transport.RpcClient;
import lombok.AllArgsConstructor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -17,10 +19,14 @@ import java.net.Socket; ...@@ -17,10 +19,14 @@ import java.net.Socket;
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 18:40:00 * @createTime 2020年05月10日 18:40:00
*/ */
public class RpcClient { @AllArgsConstructor
private static final Logger logger = LoggerFactory.getLogger(RpcClient.class); public class SocketRpcClient implements RpcClient {
private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class);
private String host;
private int port;
public Object sendRpcRequest(RpcRequest rpcRequest, String host, int port) { @Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
try (Socket socket = new Socket(host, port)) { try (Socket socket = new Socket(host, port)) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream()); ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
objectOutputStream.writeObject(rpcRequest); objectOutputStream.writeObject(rpcRequest);
......
package github.javaguide.remoting.socket; package github.javaguide.transport.socket;
import github.javaguide.dto.RpcRequest; import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse; import github.javaguide.dto.RpcResponse;
import github.javaguide.registry.DefaultServiceRegistry;
import github.javaguide.registry.ServiceRegistry; import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.RpcRequestHandler; import github.javaguide.transport.RpcRequestHandler;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -16,16 +17,18 @@ import java.net.Socket; ...@@ -16,16 +17,18 @@ import java.net.Socket;
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 09:18:00 * @createTime 2020年05月10日 09:18:00
*/ */
public class RpcRequestHandlerRunnable implements Runnable { public class SocketRpcRequestHandlerRunnable implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(RpcRequestHandlerRunnable.class); private static final Logger logger = LoggerFactory.getLogger(SocketRpcRequestHandlerRunnable.class);
private Socket socket; private Socket socket;
private RpcRequestHandler rpcRequestHandler; private static RpcRequestHandler rpcRequestHandler;
private ServiceRegistry serviceRegistry; private static ServiceRegistry serviceRegistry;
static {
rpcRequestHandler=new RpcRequestHandler();
serviceRegistry = new DefaultServiceRegistry();
}
public RpcRequestHandlerRunnable(Socket socket, RpcRequestHandler rpcRequestHandler, ServiceRegistry serviceRegistry) { public SocketRpcRequestHandlerRunnable(Socket socket) {
this.socket = socket; this.socket = socket;
this.rpcRequestHandler = rpcRequestHandler;
this.serviceRegistry = serviceRegistry;
} }
@Override @Override
......
package github.javaguide.remoting.socket; package github.javaguide.transport.socket;
import github.javaguide.remoting.RpcRequestHandler;
import github.javaguide.registry.ServiceRegistry;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit; ...@@ -20,7 +18,7 @@ import java.util.concurrent.TimeUnit;
* @author shuang.kou * @author shuang.kou
* @createTime 2020年05月10日 08:01:00 * @createTime 2020年05月10日 08:01:00
*/ */
public class RpcServer { public class SocketRpcServer {
/** /**
* 线程池参数 * 线程池参数
*/ */
...@@ -29,12 +27,9 @@ public class RpcServer { ...@@ -29,12 +27,9 @@ public class RpcServer {
private static final int KEEP_ALIVE_TIME = 1; private static final int KEEP_ALIVE_TIME = 1;
private static final int BLOCKING_QUEUE_CAPACITY = 100; private static final int BLOCKING_QUEUE_CAPACITY = 100;
private ExecutorService threadPool; private ExecutorService threadPool;
private RpcRequestHandler rpcRequestHandler = new RpcRequestHandler(); private static final Logger logger = LoggerFactory.getLogger(SocketRpcServer.class);
private static final Logger logger = LoggerFactory.getLogger(RpcServer.class);
private final ServiceRegistry serviceRegistry;
public RpcServer(ServiceRegistry serviceRegistry) { public SocketRpcServer() {
this.serviceRegistry = serviceRegistry;
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY); BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(BLOCKING_QUEUE_CAPACITY);
ThreadFactory threadFactory = Executors.defaultThreadFactory(); ThreadFactory threadFactory = Executors.defaultThreadFactory();
this.threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, workQueue, threadFactory); this.threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOL_SIZE_SIZE, KEEP_ALIVE_TIME, TimeUnit.MINUTES, workQueue, threadFactory);
...@@ -47,7 +42,7 @@ public class RpcServer { ...@@ -47,7 +42,7 @@ public class RpcServer {
Socket socket; Socket socket;
while ((socket = server.accept()) != null) { while ((socket = server.accept()) != null) {
logger.info("client connected"); logger.info("client connected");
threadPool.execute(new RpcRequestHandlerRunnable(socket, rpcRequestHandler, serviceRegistry)); threadPool.execute(new SocketRpcRequestHandlerRunnable(socket));
} }
threadPool.shutdown(); threadPool.shutdown();
} catch (IOException e) { } catch (IOException e) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册