NettyRemotingClient.java 14.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.remote;

import io.netty.bootstrap.Bootstrap;
21 22 23 24 25 26 27
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
28 29
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
30

31 32 33
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
T
Tboy 已提交
34
import org.apache.dolphinscheduler.remote.command.CommandType;
35 36
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
37 38 39 40 41
import org.apache.dolphinscheduler.remote.exceptions.RemotingTimeoutException;
import org.apache.dolphinscheduler.remote.exceptions.RemotingTooMuchRequestException;
import org.apache.dolphinscheduler.remote.future.InvokeCallback;
import org.apache.dolphinscheduler.remote.future.ReleaseSemaphore;
import org.apache.dolphinscheduler.remote.future.ResponseFuture;
42
import org.apache.dolphinscheduler.remote.handler.NettyClientHandler;
T
Tboy 已提交
43 44
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
45 46
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
47 48
import org.apache.dolphinscheduler.remote.utils.NettyUtils;

49 50 51 52
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
53
import java.util.concurrent.*;
54 55 56 57
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
58
 * remoting netty client
59 60 61 62 63 64
 */
public class NettyRemotingClient {

    private final Logger logger = LoggerFactory.getLogger(NettyRemotingClient.class);

    /**
65
     * client bootstrap
66 67 68 69
     */
    private final Bootstrap bootstrap = new Bootstrap();

    /**
70
     * encoder
71 72 73 74
     */
    private final NettyEncoder encoder = new NettyEncoder();

    /**
75
     * channels
76
     */
T
Tboy 已提交
77
    private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
78 79

    /**
80
     * started flag
81 82 83 84
     */
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    /**
85
     * worker group
86
     */
87
    private final EventLoopGroup workerGroup;
88 89

    /**
90
     * client config
91
     */
92
    private final NettyClientConfig clientConfig;
93 94

    /**
95
     * saync semaphore
96
     */
97
    private final Semaphore asyncSemaphore = new Semaphore(200, true);
98 99

    /**
100
     * callback thread executor
101 102 103 104
     */
    private final ExecutorService callbackExecutor;

    /**
105
     * client handler
106 107 108
     */
    private final NettyClientHandler clientHandler;

T
Tboy 已提交
109
    /**
110
     * response future executor
T
Tboy 已提交
111 112 113
     */
    private final ScheduledExecutorService responseFutureExecutor;

114
    /**
115 116
     * client init
     *
117 118
     * @param clientConfig client config
     */
119
    public NettyRemotingClient(final NettyClientConfig clientConfig) {
120
        this.clientConfig = clientConfig;
121 122 123
        if (NettyUtils.useEpoll()) {
            this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
124

125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
140
        this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
141 142
            new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
            new CallerThreadExecutePolicy());
143 144
        this.clientHandler = new NettyClientHandler(this, callbackExecutor);

T
Tboy 已提交
145 146
        this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));

147 148 149 150
        this.start();
    }

    /**
151
     * start
152
     */
153
    private void start() {
154 155

        this.bootstrap
156
            .group(this.workerGroup)
157
            .channel(NettyUtils.getSocketChannelClass())
158 159 160 161
            .option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
            .option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
            .option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
            .option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
162
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
163 164 165 166 167 168 169 170 171
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(
                        new NettyDecoder(),
                        clientHandler,
                        encoder);
                }
            });
T
Tboy 已提交
172 173 174 175 176 177
        this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                ResponseFuture.scanFutureTable();
            }
        }, 5000, 1000, TimeUnit.MILLISECONDS);
178
        //
179 180 181 182
        isStarted.compareAndSet(false, true);
    }

    /**
183 184 185 186 187
     * async send
     *
     * @param host           host
     * @param command        command
     * @param timeoutMillis  timeoutMillis
188 189 190
     * @param invokeCallback callback function
     * @throws InterruptedException
     * @throws RemotingException
191
     */
T
Tboy 已提交
192
    public void sendAsync(final Host host, final Command command,
193 194
                          final long timeoutMillis,
                          final InvokeCallback invokeCallback) throws InterruptedException, RemotingException {
T
Tboy 已提交
195
        final Channel channel = getChannel(host);
196 197 198 199 200 201 202 203 204 205 206
        if (channel == null) {
            throw new RemotingException("network error");
        }
        /**
         * request unique identification
         */
        final long opaque = command.getOpaque();
        /**
         *  control concurrency number
         */
        boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
207
        if (acquired) {
208
            final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
209

210 211 212 213
            /**
             *  response future
             */
            final ResponseFuture responseFuture = new ResponseFuture(opaque,
214 215 216
                timeoutMillis,
                invokeCallback,
                releaseSemaphore);
217
            try {
218
                channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
219 220 221

                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
222
                        if (future.isSuccess()) {
223 224 225 226 227 228 229 230 231
                            responseFuture.setSendOk(true);
                            return;
                        } else {
                            responseFuture.setSendOk(false);
                        }
                        responseFuture.setCause(future.cause());
                        responseFuture.putResponse(null);
                        try {
                            responseFuture.executeInvokeCallback();
232
                        } catch (Throwable ex) {
233
                            logger.error("execute callback error", ex);
234
                        } finally {
235 236 237 238
                            responseFuture.release();
                        }
                    }
                });
239
            } catch (Throwable ex) {
240
                responseFuture.release();
T
Tboy 已提交
241
                throw new RemotingException(String.format("send command to host: %s failed", host), ex);
242
            }
243
        } else {
244
            String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
245
                timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
246 247
            throw new RemotingTooMuchRequestException(message);
        }
248 249 250
    }

    /**
251
     * sync send
252 253 254
     *
     * @param host          host
     * @param command       command
255 256 257
     * @param timeoutMillis timeoutMillis
     * @return command
     * @throws InterruptedException
258 259
     * @throws RemotingException
     */
T
Tboy 已提交
260 261
    public Command sendSync(final Host host, final Command command, final long timeoutMillis) throws InterruptedException, RemotingException {
        final Channel channel = getChannel(host);
262
        if (channel == null) {
T
Tboy 已提交
263
            throw new RemotingException(String.format("connect to : %s fail", host));
264
        }
265 266 267 268 269
        final long opaque = command.getOpaque();
        final ResponseFuture responseFuture = new ResponseFuture(opaque, timeoutMillis, null, null);
        channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
270
                if (future.isSuccess()) {
271 272 273 274
                    responseFuture.setSendOk(true);
                    return;
                } else {
                    responseFuture.setSendOk(false);
275
                }
276 277
                responseFuture.setCause(future.cause());
                responseFuture.putResponse(null);
T
Tboy 已提交
278
                logger.error("send command {} to host {} failed", command, host);
279 280 281 282 283 284
            }
        });
        /**
         * sync wait for result
         */
        Command result = responseFuture.waitResponse();
285 286
        if (result == null) {
            if (responseFuture.isSendOK()) {
T
Tboy 已提交
287
                throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
288
            } else {
T
Tboy 已提交
289
                throw new RemotingException(host.toString(), responseFuture.getCause());
290
            }
291
        }
292
        return result;
293 294
    }

295
    /**
296 297 298
     * send task
     *
     * @param host    host
299 300 301
     * @param command command
     * @throws RemotingException
     */
T
Tboy 已提交
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322
    public void send(final Host host, final Command command) throws RemotingException {
        Channel channel = getChannel(host);
        if (channel == null) {
            throw new RemotingException(String.format("connect to : %s fail", host));
        }
        try {
            ChannelFuture future = channel.writeAndFlush(command).await();
            if (future.isSuccess()) {
                logger.debug("send command : {} , to : {} successfully.", command, host.getAddress());
            } else {
                String msg = String.format("send command : %s , to :%s failed", command, host.getAddress());
                logger.error(msg, future.cause());
                throw new RemotingException(msg);
            }
        } catch (Exception e) {
            logger.error("Send command {} to address {} encounter error.", command, host.getAddress());
            throw new RemotingException(String.format("Send command : %s , to :%s encounter error", command, host.getAddress()), e);
        }
    }

    /**
323 324
     * register processor
     *
T
Tboy 已提交
325
     * @param commandType command type
326
     * @param processor   processor
T
Tboy 已提交
327 328 329 330 331 332
     */
    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
        this.registerProcessor(commandType, processor, null);
    }

    /**
333
     * register processor
T
Tboy 已提交
334 335
     *
     * @param commandType command type
336 337
     * @param processor   processor
     * @param executor    thread executor
T
Tboy 已提交
338 339 340 341 342
     */
    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
        this.clientHandler.registerProcessor(commandType, processor, executor);
    }

343
    /**
344 345
     * get channel
     *
T
Tboy 已提交
346
     * @param host
347
     * @return
348
     */
T
Tboy 已提交
349 350
    public Channel getChannel(Host host) {
        Channel channel = channels.get(host);
351
        if (channel != null && channel.isActive()) {
352 353
            return channel;
        }
T
Tboy 已提交
354
        return createChannel(host, true);
355 356 357
    }

    /**
358
     * create channel
359 360
     *
     * @param host   host
361
     * @param isSync sync flag
362 363
     * @return channel
     */
T
Tboy 已提交
364
    public Channel createChannel(Host host, boolean isSync) {
365 366
        ChannelFuture future;
        try {
367
            synchronized (bootstrap) {
T
Tboy 已提交
368
                future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
369
            }
370
            if (isSync) {
371 372 373 374
                future.sync();
            }
            if (future.isSuccess()) {
                Channel channel = future.channel();
T
Tboy 已提交
375
                channels.put(host, channel);
376 377 378
                return channel;
            }
        } catch (Exception ex) {
379
            logger.warn(String.format("connect to %s error", host), ex);
380 381 382 383 384
        }
        return null;
    }

    /**
385
     * close
386 387
     */
    public void close() {
388
        if (isStarted.compareAndSet(true, false)) {
389 390
            try {
                closeChannels();
391
                if (workerGroup != null) {
392 393
                    this.workerGroup.shutdownGracefully();
                }
394
                if (callbackExecutor != null) {
395
                    this.callbackExecutor.shutdownNow();
396
                }
397
                if (this.responseFutureExecutor != null) {
T
Tboy 已提交
398 399
                    this.responseFutureExecutor.shutdownNow();
                }
400 401 402 403 404 405 406 407
            } catch (Exception ex) {
                logger.error("netty client close exception", ex);
            }
            logger.info("netty client closed");
        }
    }

    /**
408
     * close channels
409
     */
410
    private void closeChannels() {
411 412 413 414 415 416 417
        for (Channel channel : this.channels.values()) {
            channel.close();
        }
        this.channels.clear();
    }

    /**
418
     * close channel
419
     *
T
Tboy 已提交
420
     * @param host host
421
     */
422
    public void closeChannel(Host host) {
T
Tboy 已提交
423
        Channel channel = this.channels.remove(host);
424
        if (channel != null) {
425 426 427 428
            channel.close();
        }
    }
}