NettyRemotingServer.java 8.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.dolphinscheduler.remote;

import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants;
27 28
import org.apache.dolphinscheduler.remote.utils.NettyUtils;

29 30 31 32 33 34
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

35 36 37 38 39 40 41 42 43 44 45 46 47
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;

48
/**
49
 * remoting netty server
50 51 52 53 54 55
 */
public class NettyRemotingServer {

    private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);

    /**
56
     * server bootstrap
57 58 59 60
     */
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();

    /**
61
     * encoder
62 63 64 65
     */
    private final NettyEncoder encoder = new NettyEncoder();

    /**
66
     * default executor
67 68 69 70 71 72
     */
    private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);

    /**
     * boss group
     */
73
    private final EventLoopGroup bossGroup;
74 75

    /**
76
     * worker group
77
     */
78
    private final EventLoopGroup workGroup;
79 80

    /**
81
     * server config
82 83 84 85
     */
    private final NettyServerConfig serverConfig;

    /**
86
     * server handler
87 88 89 90 91 92 93 94 95
     */
    private final NettyServerHandler serverHandler = new NettyServerHandler(this);

    /**
     * started flag
     */
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    /**
96
     * server init
97 98 99
     *
     * @param serverConfig server config
     */
100
    public NettyRemotingServer(final NettyServerConfig serverConfig) {
101
        this.serverConfig = serverConfig;
102 103 104
        if (NettyUtils.useEpoll()) {
            this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
105

106 107 108 109 110
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
                }
            });
111

112 113
            this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);
114

115 116 117 118 119 120 121 122 123 124 125 126 127 128
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
                }
            });
        } else {
            this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
                }
            });
129

130 131 132 133 134 135 136 137 138
            this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
                }
            });
        }
139 140 141
    }

    /**
142
     * server start
143
     */
144
    public void start() {
145 146
        if (isStarted.compareAndSet(false, true)) {
            this.serverBootstrap
147
                .group(this.bossGroup, this.workGroup)
148
                .channel(NettyUtils.getServerSocketChannelClass())
149 150 151 152 153 154
                .option(ChannelOption.SO_REUSEADDR, true)
                .option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
                .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
                .childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
                .childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
155
                .childHandler(new ChannelInitializer<SocketChannel>() {
156 157

                    @Override
158
                    protected void initChannel(SocketChannel ch) throws Exception {
159 160 161
                        initNettyChannel(ch);
                    }
                });
162 163 164 165 166

            ChannelFuture future;
            try {
                future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
            } catch (Exception e) {
167
                logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
168 169 170 171 172 173 174 175 176
                throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
            }
            if (future.isSuccess()) {
                logger.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
            } else if (future.cause() != null) {
                throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()), future.cause());
            } else {
                throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
            }
177 178 179 180
        }
    }

    /**
181 182
     * init netty channel
     *
183 184
     * @param ch socket channel
     */
185
    private void initNettyChannel(SocketChannel ch) {
186 187 188 189 190 191 192
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("encoder", encoder);
        pipeline.addLast("decoder", new NettyDecoder());
        pipeline.addLast("handler", serverHandler);
    }

    /**
193 194
     * register processor
     *
195
     * @param commandType command type
196
     * @param processor   processor
197 198 199 200 201 202
     */
    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
        this.registerProcessor(commandType, processor, null);
    }

    /**
203
     * register processor
204 205
     *
     * @param commandType command type
206 207
     * @param processor   processor
     * @param executor    thread executor
208 209 210 211 212 213
     */
    public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
        this.serverHandler.registerProcessor(commandType, processor, executor);
    }

    /**
214 215
     * get default thread executor
     *
216 217 218 219 220 221 222
     * @return thread executor
     */
    public ExecutorService getDefaultExecutor() {
        return defaultExecutor;
    }

    public void close() {
223
        if (isStarted.compareAndSet(true, false)) {
224
            try {
225
                if (bossGroup != null) {
226 227
                    this.bossGroup.shutdownGracefully();
                }
228
                if (workGroup != null) {
229 230
                    this.workGroup.shutdownGracefully();
                }
231
                defaultExecutor.shutdown();
232 233 234 235 236 237 238
            } catch (Exception ex) {
                logger.error("netty server close exception", ex);
            }
            logger.info("netty server closed");
        }
    }
}