提交 0db19065 编写于 作者: C CalvinKirs

rpc test

上级 985a4fc9
...@@ -17,19 +17,19 @@ import org.apache.dolphinscheduler.remote.utils.Host; ...@@ -17,19 +17,19 @@ import org.apache.dolphinscheduler.remote.utils.Host;
public class MainTest { public class MainTest {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
NettyServer nettyServer=new NettyServer(new NettyServerConfig()); NettyServer nettyServer = new NettyServer(new NettyServerConfig());
// NettyClient nettyClient=new NettyClient(new NettyClientConfig()); // NettyClient nettyClient=new NettyClient(new NettyClientConfig());
Host host=new Host("127.0.0.1",12366); Host host = new Host("127.0.0.1", 12366);
IRpcClient rpcClient=new RpcClient(); IRpcClient rpcClient = new RpcClient();
IUserService userService= rpcClient.create(UserService.class); IUserService userService = rpcClient.create(UserService.class);
userService.say("calvin"); String result = userService.say("calvin");
System.out.println("我是你爸爸吧"+result);
// nettyClient.sendMsg(host,rpcRequest);
// nettyClient.sendMsg(host,rpcRequest);
} }
} }
...@@ -37,8 +37,8 @@ public class ConsumerInterceptor { ...@@ -37,8 +37,8 @@ public class ConsumerInterceptor {
System.out.println(invoker.invoke(request)); System.out.println(invoker.invoke(request));
NettyClient nettyClient = new NettyClient(new NettyClientConfig()); NettyClient nettyClient = new NettyClient(new NettyClientConfig());
Host host = new Host("127.0.0.1", 12336); Host host = new Host("127.0.0.1", 12336);
nettyClient.sendMsg(host, request); return nettyClient.sendMsg(host, request);
return null;
} }
......
...@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils; ...@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
...@@ -171,16 +172,20 @@ public class NettyClient { ...@@ -171,16 +172,20 @@ public class NettyClient {
System.out.println("netty client start"); System.out.println("netty client start");
} }
public void sendMsg(Host host, RpcRequest request) { public Object sendMsg(Host host, RpcRequest request) {
Channel channel = getChannel(host); Channel channel = getChannel(host);
assert channel != null; assert channel != null;
// ctx.writeAndFlush(Unpooled.copiedBuffer
RpcFuture future = new RpcFuture(); RpcFuture future = new RpcFuture();
RpcRequestTable.put(request.getRequestId(), future); RpcRequestTable.put(request.getRequestId(), future);
channel.writeAndFlush(request); channel.writeAndFlush(request);
// System.out.println(); Object result = null;
// channel.writeAndFlush( ProtoStuffUtils.serialize(request)); try {
result=future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return result;
} }
......
...@@ -11,8 +11,10 @@ import io.netty.handler.timeout.IdleStateEvent; ...@@ -11,8 +11,10 @@ import io.netty.handler.timeout.IdleStateEvent;
import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable;
import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest;
import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse;
import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
...@@ -47,6 +49,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { ...@@ -47,6 +49,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("收到消息"); System.out.println("收到消息");
RpcResponse rsp = (RpcResponse) msg; RpcResponse rsp = (RpcResponse) msg;
RpcFuture rpcFuture= RpcRequestTable.get(rsp.getRequestId());
if(null!=rpcFuture){
RpcRequestTable.remove(rsp.getRequestId());
rpcFuture.done(rsp);
}
System.out.println(rsp.getResult().toString()); System.out.println(rsp.getResult().toString());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册