NettyClientTransport.java 2.5 KB
Newer Older
1 2
package github.javaguide.remoting.transport.netty.client;

S
sakuragi1111 已提交
3
import github.javaguide.extension.ExtensionLoader;
4 5 6 7 8 9 10 11 12 13 14 15 16
import github.javaguide.factory.SingletonFactory;
import github.javaguide.registry.ServiceDiscovery;
import github.javaguide.remoting.dto.RpcRequest;
import github.javaguide.remoting.dto.RpcResponse;
import github.javaguide.remoting.transport.ClientTransport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;

/**
17
 * transport rpcRequest based on netty.
18 19 20 21 22 23 24 25
 *
 * @author shuang.kou
 * @createTime 2020年05月29日 11:34:00
 */
@Slf4j
public class NettyClientTransport implements ClientTransport {
    private final ServiceDiscovery serviceDiscovery;
    private final UnprocessedRequests unprocessedRequests;
26
    private final ChannelProvider channelProvider;
27 28

    public NettyClientTransport() {
S
sakuragi1111 已提交
29
        this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
30
        this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
31
        this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
32 33 34
    }

    @Override
35 36 37
    public CompletableFuture<RpcResponse<Object>> sendRpcRequest(RpcRequest rpcRequest) {
        // build return value
        CompletableFuture<RpcResponse<Object>> resultFuture = new CompletableFuture<>();
F
fupeng 已提交
38
        // build rpc service name by rpcRequest
39
        String rpcServiceName = rpcRequest.toRpcProperties().toRpcServiceName();
40 41 42
        // get server address
        InetSocketAddress inetSocketAddress = serviceDiscovery.lookupService(rpcServiceName);
        // get  server address related channel
43
        Channel channel = channelProvider.get(inetSocketAddress);
44
        if (channel != null && channel.isActive()) {
45
            // put unprocessed request
46 47 48
            unprocessedRequests.put(rpcRequest.getRequestId(), resultFuture);
            channel.writeAndFlush(rpcRequest).addListener((ChannelFutureListener) future -> {
                if (future.isSuccess()) {
S
shuang.kou 已提交
49
                    log.info("client send message: [{}]", rpcRequest);
50 51 52 53 54 55 56 57
                } else {
                    future.channel().close();
                    resultFuture.completeExceptionally(future.cause());
                    log.error("Send failed:", future.cause());
                }
            });
        } else {
            throw new IllegalStateException();
58 59 60 61 62 63 64 65
        }

        return resultFuture;
    }

}