提交 26d68c7c 编写于 作者: G guide

[refractor&fix]完善rpc传参&修复失败测试

上级 336316ce
......@@ -150,9 +150,9 @@ public class NettyServerMain {
NettyServer nettyServer = new NettyServer();
// Register service manually
HelloService helloService2 = new HelloServiceImpl2();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
RpcServiceProperties rpcServiceConfig = RpcServiceProperties.builder()
.group("test2").version("version2").build();
nettyServer.registerService(helloService2, rpcServiceProperties);
nettyServer.registerService(helloService2, rpcServiceConfig);
nettyServer.start();
}
}
......@@ -182,9 +182,9 @@ public class HelloController {
```java
ClientTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
RpcServiceProperties rpcServiceConfig = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceConfig);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
......@@ -196,9 +196,9 @@ public class NettyServerMain {
NettyServer nettyServer = new NettyServer();
// Register service manually
HelloService helloService2 = new HelloServiceImpl2();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
RpcServiceProperties rpcServiceConfig = RpcServiceProperties.builder()
.group("test2").version("version2").build();
nettyServer.registerService(helloService2, rpcServiceProperties);
nettyServer.registerService(helloService2, rpcServiceConfig);
nettyServer.start();
}
}
......@@ -227,9 +227,9 @@ public class HelloController {
```java
ClientTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
RpcServiceProperties rpcServiceConfig = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceConfig);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
......@@ -178,8 +178,6 @@
<property name="basicOffset" value="4"/>
<property name="braceAdjustment" value="0"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="8"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="arrayInitIndent" value="4"/>
</module>
......
......@@ -111,10 +111,10 @@ public @interface RpcService {
```java
import github.javaguide.annotation.RpcService;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
......@@ -145,10 +145,10 @@ public class SpringBeanPostProcessor implements BeanPostProcessor {
log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
// 获取注解
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
RpcServiceProperties rpcServiceConfig = RpcServiceProperties.builder()
.group(rpcService.group()).version(rpcService.version()).build();
// 发布服务
serviceProvider.publishService(bean, rpcServiceProperties);
serviceProvider.publishService(bean, rpcServiceConfig);
}
return bean;
}
......
package github.javaguide;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.RpcRequestTransport;
import github.javaguide.remoting.transport.socket.SocketRpcClient;
......@@ -12,9 +12,8 @@ import github.javaguide.remoting.transport.socket.SocketRpcClient;
public class SocketClientMain {
public static void main(String[] args) {
RpcRequestTransport rpcRequestTransport = new SocketRpcClient();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceProperties);
RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcRequestTransport, rpcServiceConfig);
HelloService helloService = rpcClientProxy.getProxy(HelloService.class);
String hello = helloService.hello(new Hello("111", "222"));
System.out.println(hello);
......
import github.javaguide.HelloService;
import github.javaguide.annotation.RpcScan;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import github.javaguide.serviceimpl.HelloServiceImpl2;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
......@@ -19,9 +19,9 @@ public class NettyServerMain {
NettyRpcServer nettyRpcServer = (NettyRpcServer) applicationContext.getBean("nettyRpcServer");
// Register service manually
HelloService helloService2 = new HelloServiceImpl2();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
nettyRpcServer.registerService(helloService2, rpcServiceProperties);
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group("test2").version("version2").service(helloService2).build();
nettyRpcServer.registerService(rpcServiceConfig);
nettyRpcServer.start();
}
}
import github.javaguide.HelloService;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.remoting.transport.socket.SocketRpcServer;
import github.javaguide.serviceimpl.HelloServiceImpl;
......@@ -11,9 +11,9 @@ public class SocketServerMain {
public static void main(String[] args) {
HelloService helloService = new HelloServiceImpl();
SocketRpcServer socketRpcServer = new SocketRpcServer();
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group("test2").version("version2").build();
socketRpcServer.registerService(helloService, rpcServiceProperties);
RpcServiceConfig rpcServiceConfig = new RpcServiceConfig();
rpcServiceConfig.setService(helloService);
socketRpcServer.registerService(rpcServiceConfig);
socketRpcServer.start();
}
}
package github.javaguide.factory;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......
......@@ -7,6 +7,7 @@ package github.javaguide.utils;
public class RuntimeUtil {
/**
* 获取CPU的核心数
*
* @return cpu的核心数
*/
public static int cpus() {
......
......@@ -24,7 +24,7 @@ public class GzipCompress implements Compress {
throw new NullPointerException("bytes is null");
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = new GZIPOutputStream(out)) {
GZIPOutputStream gzip = new GZIPOutputStream(out)) {
gzip.write(bytes);
gzip.flush();
gzip.finish();
......@@ -40,7 +40,7 @@ public class GzipCompress implements Compress {
throw new NullPointerException("bytes is null");
}
try (ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPInputStream gunzip = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
GZIPInputStream gunzip = new GZIPInputStream(new ByteArrayInputStream(bytes))) {
byte[] buffer = new byte[BUFFER_SIZE];
int n;
while ((n = gunzip.read(buffer)) > -1) {
......
package github.javaguide.entity;
package github.javaguide.config;
import lombok.AllArgsConstructor;
import lombok.Builder;
......@@ -17,18 +17,26 @@ import lombok.ToString;
@Setter
@Builder
@ToString
public class RpcServiceProperties {
public class RpcServiceConfig {
/**
* service version
*/
private String version;
private String version = "";
/**
* when the interface has multiple implementation classes, distinguish by group
*/
private String group;
private String serviceName;
private String group = "";
public String toRpcServiceName() {
/**
* target service
*/
private Object service;
public String getRpcServiceName() {
return this.getServiceName() + this.getGroup() + this.getVersion();
}
public String getServiceName() {
return this.service.getClass().getInterfaces()[0].getCanonicalName();
}
}
......@@ -27,17 +27,14 @@ public class ConsistentHashLoadBalance extends AbstractLoadBalance {
protected String doSelect(List<String> serviceAddresses, RpcRequest rpcRequest) {
int identityHashCode = System.identityHashCode(serviceAddresses);
// build rpc service name by rpcRequest
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
String rpcServiceName = rpcRequest.getRpcServiceName();
ConsistentHashSelector selector = selectors.get(rpcServiceName);
// check for updates
if (selector == null || selector.identityHashCode != identityHashCode) {
selectors.put(rpcServiceName, new ConsistentHashSelector(serviceAddresses, 160, identityHashCode));
selector = selectors.get(rpcServiceName);
}
return selector.select(rpcServiceName+ Arrays.stream(rpcRequest.getParameters()));
return selector.select(rpcServiceName + Arrays.stream(rpcRequest.getParameters()));
}
static class ConsistentHashSelector {
......
package github.javaguide.provider;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
/**
* store and provide service object.
......@@ -11,26 +11,19 @@ import github.javaguide.entity.RpcServiceProperties;
public interface ServiceProvider {
/**
* @param service service object
* @param serviceClass the interface class implemented by the service instance object
* @param rpcServiceProperties service related attributes
* @param rpcServiceConfig rpc service related attributes
*/
void addService(Object service, Class<?> serviceClass, RpcServiceProperties rpcServiceProperties);
void addService(RpcServiceConfig rpcServiceConfig);
/**
* @param rpcServiceProperties service related attributes
* @param rpcServiceName rpc service name
* @return service object
*/
Object getService(RpcServiceProperties rpcServiceProperties);
Object getService(String rpcServiceName);
/**
* @param service service object
* @param rpcServiceProperties service related attributes
* @param rpcServiceConfig rpc service related attributes
*/
void publishService(Object service, RpcServiceProperties rpcServiceProperties);
void publishService(RpcServiceConfig rpcServiceConfig);
/**
* @param service service object
*/
void publishService(Object service);
}
package github.javaguide.provider;
package github.javaguide.provider.impl;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.remoting.transport.netty.server.NettyRpcServer;
import lombok.extern.slf4j.Slf4j;
......@@ -20,7 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
* @createTime 2020年05月13日 11:23:00
*/
@Slf4j
public class ServiceProviderImpl implements ServiceProvider {
public class ZkServiceProviderImpl implements ServiceProvider {
/**
* key: rpc service name(interface name + version + group)
......@@ -30,27 +31,26 @@ public class ServiceProviderImpl implements ServiceProvider {
private final Set<String> registeredService;
private final ServiceRegistry serviceRegistry;
public ServiceProviderImpl() {
public ZkServiceProviderImpl() {
serviceMap = new ConcurrentHashMap<>();
registeredService = ConcurrentHashMap.newKeySet();
serviceRegistry = ExtensionLoader.getExtensionLoader(ServiceRegistry.class).getExtension("zk");
}
@Override
public void addService(Object service, Class<?> serviceClass, RpcServiceProperties rpcServiceProperties) {
String rpcServiceName = rpcServiceProperties.toRpcServiceName();
public void addService(RpcServiceConfig rpcServiceConfig) {
String rpcServiceName = rpcServiceConfig.getRpcServiceName();
if (registeredService.contains(rpcServiceName)) {
return;
}
registeredService.add(rpcServiceName);
serviceMap.put(rpcServiceName, service);
log.info("Add service: {} and interfaces:{}", rpcServiceName, service.getClass().getInterfaces());
serviceMap.put(rpcServiceName, rpcServiceConfig.getService());
log.info("Add service: {} and interfaces:{}", rpcServiceName, rpcServiceConfig.getService().getClass().getInterfaces());
}
@Override
public Object getService(RpcServiceProperties rpcServiceProperties) {
Object service = serviceMap.get(rpcServiceProperties.toRpcServiceName());
public Object getService(String rpcServiceName) {
Object service = serviceMap.get(rpcServiceName);
if (null == service) {
throw new RpcException(RpcErrorMessageEnum.SERVICE_CAN_NOT_BE_FOUND);
}
......@@ -58,19 +58,11 @@ public class ServiceProviderImpl implements ServiceProvider {
}
@Override
public void publishService(Object service) {
this.publishService(service, RpcServiceProperties.builder().group("").version("").build());
}
@Override
public void publishService(Object service, RpcServiceProperties rpcServiceProperties) {
public void publishService(RpcServiceConfig rpcServiceConfig) {
try {
String host = InetAddress.getLocalHost().getHostAddress();
Class<?> serviceRelatedInterface = service.getClass().getInterfaces()[0];
String serviceName = serviceRelatedInterface.getCanonicalName();
rpcServiceProperties.setServiceName(serviceName);
this.addService(service, serviceRelatedInterface, rpcServiceProperties);
serviceRegistry.registerService(rpcServiceProperties.toRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
this.addService(rpcServiceConfig);
serviceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), new InetSocketAddress(host, NettyRpcServer.PORT));
} catch (UnknownHostException e) {
log.error("occur exception when getHostAddress", e);
}
......
package github.javaguide.proxy;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.enums.RpcErrorMessageEnum;
import github.javaguide.enums.RpcResponseCodeEnum;
import github.javaguide.exception.RpcException;
......@@ -35,23 +35,17 @@ public class RpcClientProxy implements InvocationHandler {
* Used to send requests to the server.And there are two implementations: socket and netty
*/
private final RpcRequestTransport rpcRequestTransport;
private final RpcServiceProperties rpcServiceProperties;
private final RpcServiceConfig rpcServiceConfig;
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceProperties rpcServiceProperties) {
public RpcClientProxy(RpcRequestTransport rpcRequestTransport, RpcServiceConfig rpcServiceConfig) {
this.rpcRequestTransport = rpcRequestTransport;
if (rpcServiceProperties.getGroup() == null) {
rpcServiceProperties.setGroup("");
}
if (rpcServiceProperties.getVersion() == null) {
rpcServiceProperties.setVersion("");
}
this.rpcServiceProperties = rpcServiceProperties;
this.rpcServiceConfig = rpcServiceConfig;
}
public RpcClientProxy(RpcRequestTransport rpcRequestTransport) {
this.rpcRequestTransport = rpcRequestTransport;
this.rpcServiceProperties = RpcServiceProperties.builder().group("").version("").build();
this.rpcServiceConfig = new RpcServiceConfig();
}
/**
......@@ -76,8 +70,8 @@ public class RpcClientProxy implements InvocationHandler {
.interfaceName(method.getDeclaringClass().getName())
.paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group(rpcServiceProperties.getGroup())
.version(rpcServiceProperties.getVersion())
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
RpcResponse<Object> rpcResponse = null;
if (rpcRequestTransport instanceof NettyRpcClient) {
......
......@@ -20,16 +20,16 @@ import java.util.List;
* @createTime 2020年06月01日 15:16:00
*/
@Slf4j
public class ZkServiceDiscovery implements ServiceDiscovery {
public class ZkServiceDiscoveryImpl implements ServiceDiscovery {
private final LoadBalance loadBalance;
public ZkServiceDiscovery() {
public ZkServiceDiscoveryImpl() {
this.loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
}
@Override
public InetSocketAddress lookupService(RpcRequest rpcRequest) {
String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
String rpcServiceName = rpcRequest.getRpcServiceName();
CuratorFramework zkClient = CuratorUtils.getZkClient();
List<String> serviceUrlList = CuratorUtils.getChildrenNodes(zkClient, rpcServiceName);
if (serviceUrlList == null || serviceUrlList.size() == 0) {
......
......@@ -14,7 +14,7 @@ import java.net.InetSocketAddress;
* @createTime 2020年05月31日 10:56:00
*/
@Slf4j
public class ZkServiceRegistry implements ServiceRegistry {
public class ZkServiceRegistryImpl implements ServiceRegistry {
@Override
public void registerService(String rpcServiceName, InetSocketAddress inetSocketAddress) {
......
package github.javaguide.remoting.dto;
import github.javaguide.entity.RpcServiceProperties;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
......@@ -28,9 +27,7 @@ public class RpcRequest implements Serializable {
private String version;
private String group;
public RpcServiceProperties toRpcProperties() {
return RpcServiceProperties.builder().serviceName(this.getInterfaceName())
.version(this.getVersion())
.group(this.getGroup()).build();
public String getRpcServiceName() {
return this.getInterfaceName() + this.getGroup() + this.getVersion();
}
}
......@@ -3,7 +3,7 @@ package github.javaguide.remoting.handler;
import github.javaguide.exception.RpcException;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.remoting.dto.RpcRequest;
import lombok.extern.slf4j.Slf4j;
......@@ -21,14 +21,14 @@ public class RpcRequestHandler {
private final ServiceProvider serviceProvider;
public RpcRequestHandler() {
serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
}
/**
* Processing rpcRequest: call the corresponding method, and then return the method
*/
public Object handle(RpcRequest rpcRequest) {
Object service = serviceProvider.getService(rpcRequest.toRpcProperties());
Object service = serviceProvider.getService(rpcRequest.getRpcServiceName());
return invokeTargetMethod(rpcRequest, service);
}
......
......@@ -97,8 +97,6 @@ public final class NettyRpcClient implements RpcRequestTransport {
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build return value
CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
// build rpc service name by rpcRequest
// String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
// get server address
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
// get server address related channel
......
package github.javaguide.remoting.transport.netty.client;
import github.javaguide.compress.Compress;
import github.javaguide.enums.CompressTypeEnum;
import github.javaguide.enums.SerializationTypeEnum;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.remoting.constants.RpcConstants;
import github.javaguide.remoting.dto.RpcMessage;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.enums.SerializationTypeEnum;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
......
package github.javaguide.remoting.transport.netty.server;
import github.javaguide.config.CustomShutdownHook;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.remoting.transport.netty.codec.RpcMessageDecoder;
import github.javaguide.remoting.transport.netty.codec.RpcMessageEncoder;
import github.javaguide.utils.RuntimeUtil;
......@@ -42,10 +42,10 @@ public class NettyRpcServer {
public static final int PORT = 9998;
private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
private final ServiceProvider serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
serviceProvider.publishService(service, rpcServiceProperties);
public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}
@SneakyThrows
......
package github.javaguide.remoting.transport.socket;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.exception.RpcException;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.registry.ServiceDiscovery;
......@@ -32,9 +31,6 @@ public class SocketRpcClient implements RpcRequestTransport {
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
// build rpc service name by rpcRequest
String rpcServiceName = RpcServiceProperties.builder().serviceName(rpcRequest.getInterfaceName())
.group(rpcRequest.getGroup()).version(rpcRequest.getVersion()).build().toRpcServiceName();
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest);
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
......
......@@ -30,7 +30,7 @@ public class SocketRpcRequestHandlerRunnable implements Runnable {
public void run() {
log.info("server handle message from client by thread: [{}]", Thread.currentThread().getName());
try (ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream())) {
RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
Object result = rpcRequestHandler.handle(rpcRequest);
objectOutputStream.writeObject(RpcResponse.success(result, rpcRequest.getRequestId()));
......
package github.javaguide.remoting.transport.socket;
import github.javaguide.config.CustomShutdownHook;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.utils.concurrent.threadpool.ThreadPoolFactoryUtils;
import lombok.extern.slf4j.Slf4j;
......@@ -30,15 +30,11 @@ public class SocketRpcServer {
public SocketRpcServer() {
threadPool = ThreadPoolFactoryUtils.createCustomThreadPoolIfAbsent("socket-server-rpc-pool");
serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
}
public void registerService(Object service) {
serviceProvider.publishService(service);
}
public void registerService(Object service, RpcServiceProperties rpcServiceProperties) {
serviceProvider.publishService(service, rpcServiceProperties);
public void registerService(RpcServiceConfig rpcServiceConfig) {
serviceProvider.publishService(rpcServiceConfig);
}
public void start() {
......
......@@ -34,7 +34,7 @@ public class KryoSerializer implements Serializer {
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
......@@ -48,7 +48,7 @@ public class KryoSerializer implements Serializer {
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
......
......@@ -2,11 +2,11 @@ package github.javaguide.spring;
import github.javaguide.annotation.RpcReference;
import github.javaguide.annotation.RpcService;
import github.javaguide.entity.RpcServiceProperties;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.factory.SingletonFactory;
import github.javaguide.provider.ServiceProvider;
import github.javaguide.provider.ServiceProviderImpl;
import github.javaguide.provider.impl.ZkServiceProviderImpl;
import github.javaguide.proxy.RpcClientProxy;
import github.javaguide.remoting.transport.RpcRequestTransport;
import lombok.SneakyThrows;
......@@ -31,7 +31,7 @@ public class SpringBeanPostProcessor implements BeanPostProcessor {
private final RpcRequestTransport rpcClient;
public SpringBeanPostProcessor() {
this.serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
this.serviceProvider = SingletonFactory.getInstance(ZkServiceProviderImpl.class);
this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");
}
......@@ -43,9 +43,11 @@ public class SpringBeanPostProcessor implements BeanPostProcessor {
// get RpcService annotation
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
// build RpcServiceProperties
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group(rpcService.group()).version(rpcService.version()).build();
serviceProvider.publishService(bean, rpcServiceProperties);
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcService.group())
.version(rpcService.version())
.service(bean).build();
serviceProvider.publishService(rpcServiceConfig);
}
return bean;
}
......@@ -57,9 +59,10 @@ public class SpringBeanPostProcessor implements BeanPostProcessor {
for (Field declaredField : declaredFields) {
RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
if (rpcReference != null) {
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group(rpcReference.group()).version(rpcReference.version()).build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceProperties);
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group(rpcReference.group())
.version(rpcReference.version()).build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceConfig);
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
......
zk=github.javaguide.registry.zk.ZkServiceDiscovery
zk=github.javaguide.registry.zk.ZkServiceDiscoveryImpl
zk=github.javaguide.registry.zk.ZkServiceRegistry
zk=github.javaguide.registry.zk.ZkServiceRegistryImpl
package github.javaguide;
public interface DemoRpcService {
String hello();
}
package github.javaguide;
import github.javaguide.annotation.RpcService;
import lombok.extern.slf4j.Slf4j;
/**
* @author shuang.kou
* @createTime 2020年05月10日 07:52:00
*/
@Slf4j
@RpcService(group = "test1", version = "version1")
public class DemoRpcServiceImpl implements DemoRpcService {
@Override
public String hello() {
return "hello";
}
}
......@@ -16,7 +16,7 @@ class GzipCompressTest {
RpcRequest rpcRequest = RpcRequest.builder().methodName("hello")
.parameters(new Object[]{"sayhelooloo", "sayhelooloosayhelooloo"})
.interfaceName("github.javaguide.HelloService")
.paramTypes(new Class<?>[]{String.class,String.class})
.paramTypes(new Class<?>[]{String.class, String.class})
.requestId(UUID.randomUUID().toString())
.group("group1")
.version("version1")
......
package github.javaguide.loadbalance.loadbalancer;
import github.javaguide.DemoRpcService;
import github.javaguide.DemoRpcServiceImpl;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.extension.ExtensionLoader;
import github.javaguide.loadbalance.LoadBalance;
import github.javaguide.remoting.dto.RpcRequest;
......@@ -18,30 +21,19 @@ class ConsistentHashLoadBalanceTest {
void TestConsistentHashLoadBalance() {
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("loadBalance");
List<String> serviceUrlList = new ArrayList<>(Arrays.asList("127.0.0.1:9997", "127.0.0.1:9998", "127.0.0.1:9999"));
String userRpcServiceName = "github.javaguide.UserServicetest1version1";
//build rpcCall
RpcRequest rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName(userRpcServiceName)
// .paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.build();
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
assertEquals("127.0.0.1:9999",userServiceAddress);
DemoRpcService demoRpcService = new DemoRpcServiceImpl();
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group("test2").version("version2").service(demoRpcService).build();
String schoolRpcServiceName = "github.javaguide.SchoolServicetest1version1";
rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName(userRpcServiceName)
// .paramTypes(method.getParameterTypes())
RpcRequest rpcRequest = RpcRequest.builder()
.parameters(demoRpcService.getClass().getTypeParameters())
.interfaceName(rpcServiceConfig.getServiceName())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
String schoolServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
assertEquals("127.0.0.1:9997",schoolServiceAddress);
String userServiceAddress = loadBalance.selectServiceAddress(serviceUrlList, rpcRequest);
assertEquals("127.0.0.1:9998", userServiceAddress);
}
}
\ No newline at end of file
package github.javaguide.registry;
import github.javaguide.registry.zk.ZkServiceDiscovery;
import github.javaguide.registry.zk.ZkServiceRegistry;
import github.javaguide.DemoRpcService;
import github.javaguide.DemoRpcServiceImpl;
import github.javaguide.config.RpcServiceConfig;
import github.javaguide.registry.zk.ZkServiceDiscoveryImpl;
import github.javaguide.registry.zk.ZkServiceRegistryImpl;
import github.javaguide.remoting.dto.RpcRequest;
import org.junit.jupiter.api.Test;
......@@ -15,21 +18,24 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
* @author shuang.kou
* @createTime 2020年05月31日 16:25:00
*/
class ZkServiceRegistryTest {
class ZkServiceRegistryImplTest {
@Test
void should_register_service_successful_and_lookup_service_by_service_name() {
ServiceRegistry zkServiceRegistry = new ZkServiceRegistry();
ServiceRegistry zkServiceRegistry = new ZkServiceRegistryImpl();
InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333);
zkServiceRegistry.registerService("github.javaguide.registry.zk.ZkServiceRegistry", givenInetSocketAddress);
ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery();
DemoRpcService demoRpcService = new DemoRpcServiceImpl();
RpcServiceConfig rpcServiceConfig = RpcServiceConfig.builder()
.group("test2").version("version2").service(demoRpcService).build();
zkServiceRegistry.registerService(rpcServiceConfig.getRpcServiceName(), givenInetSocketAddress);
ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscoveryImpl();
RpcRequest rpcRequest = RpcRequest.builder()
// .parameters(args)
.interfaceName("github.javaguide.registry.zk.ZkServiceRegistry")
.interfaceName(rpcServiceConfig.getServiceName())
// .paramTypes(method.getParameterTypes())
.requestId(UUID.randomUUID().toString())
.group("test2")
.version("version2")
.group(rpcServiceConfig.getGroup())
.version(rpcServiceConfig.getVersion())
.build();
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService(rpcRequest);
assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString());
......
......@@ -14,7 +14,7 @@ class KryoSerializerTest {
RpcRequest target = RpcRequest.builder().methodName("hello")
.parameters(new Object[]{"sayhelooloo", "sayhelooloosayhelooloo"})
.interfaceName("github.javaguide.HelloService")
.paramTypes(new Class<?>[]{String.class,String.class})
.paramTypes(new Class<?>[]{String.class, String.class})
.requestId(UUID.randomUUID().toString())
.group("group1")
.version("version1")
......@@ -22,8 +22,8 @@ class KryoSerializerTest {
KryoSerializer kryoSerializer = new KryoSerializer();
byte[] bytes = kryoSerializer.serialize(target);
RpcRequest actual = kryoSerializer.deserialize(bytes, RpcRequest.class);
assertEquals(target.getGroup(),actual.getGroup());
assertEquals(target.getVersion(),actual.getVersion());
assertEquals(target.getRequestId(),actual.getRequestId());
assertEquals(target.getGroup(), actual.getGroup());
assertEquals(target.getVersion(), actual.getVersion());
assertEquals(target.getRequestId(), actual.getRequestId());
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册