提交 decac3e2 编写于 作者: S shuang.kou

[v3.0]refractor exact a interface : ServiceDiscovery

上级 d58acd0e
......@@ -17,5 +17,6 @@ public class NettyClientMain {
//如需使用 assert 断言,需要在 VM options 添加参数:-ea
assert "Hello description is 222".equals(hello);
String hello2 = helloService.hello(new Hello("111", "222"));
System.out.println(hello2);
}
}
package github.javaguide.registry;
import java.net.InetSocketAddress;
/**
* 服务发现接口
*
* @author shuang.kou
* @createTime 2020年06月01日 15:16:00
*/
public interface ServiceDiscovery {
/**
* 查找服务
*
* @param serviceName 服务名称
* @return 提供服务的地址
*/
InetSocketAddress lookupService(String serviceName);
}
......@@ -3,7 +3,7 @@ package github.javaguide.registry;
import java.net.InetSocketAddress;
/**
* 服务注册中心接口
* 服务注册接口
*
* @author shuang.kou
* @createTime 2020年05月13日 08:39:00
......@@ -17,11 +17,4 @@ public interface ServiceRegistry {
*/
void registerService(String serviceName, InetSocketAddress inetSocketAddress);
/**
* 查找服务
*
* @param serviceName 服务名称
* @return 提供服务的地址
*/
InetSocketAddress lookupService(String serviceName);
}
package github.javaguide.registry;
import github.javaguide.utils.zk.CuratorHelper;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* 基于 zookeeper 实现服务发现
*
* @author shuang.kou
* @createTime 2020年06月01日 15:16:00
*/
public class ZkServiceDiscovery implements ServiceDiscovery {
private static final Logger logger = LoggerFactory.getLogger(ZkServiceDiscovery.class);
private final CuratorFramework zkClient;
public ZkServiceDiscovery() {
this.zkClient = CuratorHelper.getZkClient();
zkClient.start();
}
@Override
public InetSocketAddress lookupService(String serviceName) {
// TODO 负载均衡
// 这里直接去了第一个找到的服务地址
String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0);
logger.info("成功找到服务地址:{}", serviceAddress);
return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1]));
}
}
......@@ -8,13 +8,13 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
/**
* 基于 zookeeper 实现服务注册中心
* 基于 zookeeper 实现服务注册
*
* @author shuang.kou
* @createTime 2020年05月31日 10:56:00
*/
public class ZkServiceRegistry implements ServiceRegistry {
private static final Logger logger = LoggerFactory.getLogger(CuratorHelper.class);
private static final Logger logger = LoggerFactory.getLogger(ZkServiceRegistry.class);
private final CuratorFramework zkClient;
public ZkServiceRegistry() {
......@@ -31,13 +31,4 @@ public class ZkServiceRegistry implements ServiceRegistry {
CuratorHelper.createEphemeralNode(zkClient, servicePath.toString());
logger.info("节点创建成功,节点为:{}", servicePath);
}
@Override
public InetSocketAddress lookupService(String serviceName) {
// TODO 负载均衡
// 这里直接去了第一个找到的服务地址
String serviceAddress = CuratorHelper.getChildrenNodes(zkClient, serviceName).get(0);
logger.info("成功找到服务地址:{}", serviceAddress);
return new InetSocketAddress(serviceAddress.split(":")[0], Integer.parseInt(serviceAddress.split(":")[1]));
}
}
......@@ -2,8 +2,8 @@ package github.javaguide.transport.netty.client;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.registry.ZkServiceDiscovery;
import github.javaguide.transport.ClientTransport;
import github.javaguide.utils.checker.RpcMessageChecker;
import io.netty.channel.Channel;
......@@ -16,24 +16,24 @@ import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;
/**
* 基于 Netty 传输 RpcRequest
* 基于 Netty 传输 RpcRequest
*
* @author shuang.kou
* @createTime 2020年05月29日 11:34:00
*/
public class NettyClientClientTransport implements ClientTransport {
private static final Logger logger = LoggerFactory.getLogger(NettyClientClientTransport.class);
private ServiceRegistry serviceRegistry;
private final ServiceDiscovery serviceDiscovery;
public NettyClientClientTransport() {
this.serviceRegistry = new ZkServiceRegistry();
this.serviceDiscovery = new ZkServiceDiscovery();
}
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
AtomicReference<Object> result = new AtomicReference<>(null);
try {
InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());
Channel channel = ChannelProvider.get(inetSocketAddress);
if (channel.isActive()) {
channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
......
......@@ -3,8 +3,8 @@ package github.javaguide.transport.socket;
import github.javaguide.dto.RpcRequest;
import github.javaguide.dto.RpcResponse;
import github.javaguide.exception.RpcException;
import github.javaguide.registry.ServiceRegistry;
import github.javaguide.registry.ZkServiceRegistry;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.registry.ZkServiceDiscovery;
import github.javaguide.transport.ClientTransport;
import github.javaguide.utils.checker.RpcMessageChecker;
import lombok.AllArgsConstructor;
......@@ -26,15 +26,15 @@ import java.net.Socket;
@AllArgsConstructor
public class SocketRpcClient implements ClientTransport {
private static final Logger logger = LoggerFactory.getLogger(SocketRpcClient.class);
private final ServiceRegistry serviceRegistry;
private final ServiceDiscovery serviceDiscovery;
public SocketRpcClient() {
this.serviceRegistry = new ZkServiceRegistry();
this.serviceDiscovery = new ZkServiceDiscovery();
}
@Override
public Object sendRpcRequest(RpcRequest rpcRequest) {
InetSocketAddress inetSocketAddress = serviceRegistry.lookupService(rpcRequest.getInterfaceName());
InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcRequest.getInterfaceName());
try (Socket socket = new Socket()) {
socket.connect(inetSocketAddress);
ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
......
......@@ -15,10 +15,11 @@ class ZkServiceRegistryTest {
@Test
void should_register_service_successful_and_lookup_service_by_service_name() {
ZkServiceRegistry zkServiceRegistry = new ZkServiceRegistry();
ServiceRegistry zkServiceRegistry = new ZkServiceRegistry();
InetSocketAddress givenInetSocketAddress = new InetSocketAddress("127.0.0.1", 9333);
zkServiceRegistry.registerService("github.javaguide.registry.ZkServiceRegistry", givenInetSocketAddress);
InetSocketAddress acquiredInetSocketAddress = zkServiceRegistry.lookupService("github.javaguide.registry.ZkServiceRegistry");
ServiceDiscovery zkServiceDiscovery = new ZkServiceDiscovery();
InetSocketAddress acquiredInetSocketAddress = zkServiceDiscovery.lookupService("github.javaguide.registry.ZkServiceRegistry");
assertEquals(givenInetSocketAddress.toString(), acquiredInetSocketAddress.toString());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册