diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java index 315f49390ce8706ef30a505b63049eac34827902..9cb9e5dc7874f7b38c94b6b8f20bec0b29585214 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/MainTest.java @@ -17,19 +17,19 @@ import org.apache.dolphinscheduler.remote.utils.Host; public class MainTest { public static void main(String[] args) throws Exception { - NettyServer nettyServer=new NettyServer(new NettyServerConfig()); + NettyServer nettyServer = new NettyServer(new NettyServerConfig()); - // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); + // NettyClient nettyClient=new NettyClient(new NettyClientConfig()); - Host host=new Host("127.0.0.1",12366); + Host host = new Host("127.0.0.1", 12366); - IRpcClient rpcClient=new RpcClient(); - IUserService userService= rpcClient.create(UserService.class); - userService.say("calvin"); + IRpcClient rpcClient = new RpcClient(); + IUserService userService = rpcClient.create(UserService.class); + String result = userService.say("calvin"); + System.out.println("我是你爸爸吧"+result); - - // nettyClient.sendMsg(host,rpcRequest); + // nettyClient.sendMsg(host,rpcRequest); } } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java index a60d06208790ea53db432ffa5cf042f2610b4097..3058cf02852aaa1d539c3f36282f3e3a95fc7e22 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/client/ConsumerInterceptor.java @@ -37,8 +37,8 @@ public class ConsumerInterceptor { System.out.println(invoker.invoke(request)); NettyClient nettyClient = new NettyClient(new NettyClientConfig()); Host host = new Host("127.0.0.1", 12336); - nettyClient.sendMsg(host, request); - return null; + return nettyClient.sendMsg(host, request); + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java index a4c74fdc41bdb08e1eaa38505663f987b7b76349..9fb923d893a644f618193da8adc5ccef72d01ce9 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClient.java @@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.remote.utils.NettyUtils; import java.net.InetSocketAddress; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -171,16 +172,20 @@ public class NettyClient { System.out.println("netty client start"); } - public void sendMsg(Host host, RpcRequest request) { + public Object sendMsg(Host host, RpcRequest request) { Channel channel = getChannel(host); assert channel != null; - // ctx.writeAndFlush(Unpooled.copiedBuffer RpcFuture future = new RpcFuture(); RpcRequestTable.put(request.getRequestId(), future); - channel.writeAndFlush(request); - // System.out.println(); - // channel.writeAndFlush( ProtoStuffUtils.serialize(request)); + Object result = null; + try { + result=future.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + return result; + } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java index e02cb3c9253ef2401af345f5137daf24dd4118e5..2c6202bca78a90b71e85dd42a8484c310ecde73b 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/rpc/remote/NettyClientHandler.java @@ -11,8 +11,10 @@ import io.netty.handler.timeout.IdleStateEvent; import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; +import org.apache.dolphinscheduler.remote.rpc.client.RpcRequestTable; import org.apache.dolphinscheduler.remote.rpc.common.RpcRequest; import org.apache.dolphinscheduler.remote.rpc.common.RpcResponse; +import org.apache.dolphinscheduler.remote.rpc.future.RpcFuture; import java.net.InetSocketAddress; @@ -47,6 +49,11 @@ public class NettyClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("收到消息"); RpcResponse rsp = (RpcResponse) msg; + RpcFuture rpcFuture= RpcRequestTable.get(rsp.getRequestId()); + if(null!=rpcFuture){ + RpcRequestTable.remove(rsp.getRequestId()); + rpcFuture.done(rsp); + } System.out.println(rsp.getResult().toString()); }