未验证 提交 34a04aca 编写于 作者: C CalvinKirs 提交者: GitHub

[Improvement][remote] improvement netty eventLoopGroup (#3580)

* [Improvement][remote] improvement netty eventLoopGroup
If you are running on linux you can use EpollEventLoopGroup and so get better performance, less GC and have more advanced features that are only available on linux.

* reformat code

* utility classes should not have public constructors

* reformat code

* reformat code

* reformat code

* add test

* code style

* add test

* add test

* fix test error

* add test

* test

* test

* add test config

* add test config
上级 93660f4d
......@@ -18,10 +18,17 @@
package org.apache.dolphinscheduler.remote;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -38,6 +45,8 @@ import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.CallerThreadExecutePolicy;
import org.apache.dolphinscheduler.remote.utils.NamedThreadFactory;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -47,7 +56,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* remoting netty client
* remoting netty client
*/
public class NettyRemotingClient {
......@@ -59,7 +68,7 @@ public class NettyRemotingClient {
private final Bootstrap bootstrap = new Bootstrap();
/**
* encoder
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
......@@ -69,57 +78,69 @@ public class NettyRemotingClient {
private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap(128);
/**
* started flag
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* worker group
* worker group
*/
private final NioEventLoopGroup workerGroup;
private final EventLoopGroup workerGroup;
/**
* client config
* client config
*/
private final NettyClientConfig clientConfig;
/**
* saync semaphore
* saync semaphore
*/
private final Semaphore asyncSemaphore = new Semaphore(200, true);
/**
* callback thread executor
* callback thread executor
*/
private final ExecutorService callbackExecutor;
/**
* client handler
* client handler
*/
private final NettyClientHandler clientHandler;
/**
* response future executor
* response future executor
*/
private final ScheduledExecutorService responseFutureExecutor;
/**
* client init
* client init
*
* @param clientConfig client config
*/
public NettyRemotingClient(final NettyClientConfig clientConfig){
public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
if (NettyUtils.useEpoll()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyClient_%d", this.threadIndex.incrementAndGet()));
}
});
}
this.callbackExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
new LinkedBlockingQueue<>(1000), new NamedThreadFactory("CallbackExecutor", 10),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));
......@@ -128,26 +149,26 @@ public class NettyRemotingClient {
}
/**
* start
* start
*/
private void start(){
private void start() {
this.bootstrap
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyDecoder(),
clientHandler,
encoder);
}
});
.group(this.workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(
new NettyDecoder(),
clientHandler,
encoder);
}
});
this.responseFutureExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
......@@ -159,10 +180,11 @@ public class NettyRemotingClient {
}
/**
* async send
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* async send
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @param invokeCallback callback function
* @throws InterruptedException
* @throws RemotingException
......@@ -182,22 +204,22 @@ public class NettyRemotingClient {
* control concurrency number
*/
boolean acquired = this.asyncSemaphore.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
if(acquired){
if (acquired) {
final ReleaseSemaphore releaseSemaphore = new ReleaseSemaphore(this.asyncSemaphore);
/**
* response future
*/
final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis,
invokeCallback,
releaseSemaphore);
timeoutMillis,
invokeCallback,
releaseSemaphore);
try {
channel.writeAndFlush(command).addListener(new ChannelFutureListener(){
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
......@@ -207,28 +229,29 @@ public class NettyRemotingClient {
responseFuture.putResponse(null);
try {
responseFuture.executeInvokeCallback();
} catch (Throwable ex){
} catch (Throwable ex) {
logger.error("execute callback error", ex);
} finally{
} finally {
responseFuture.release();
}
}
});
} catch (Throwable ex){
} catch (Throwable ex) {
responseFuture.release();
throw new RemotingException(String.format("send command to host: %s failed", host), ex);
}
} else{
} else {
String message = String.format("try to acquire async semaphore timeout: %d, waiting thread num: %d, total permits: %d",
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
timeoutMillis, asyncSemaphore.getQueueLength(), asyncSemaphore.availablePermits());
throw new RemotingTooMuchRequestException(message);
}
}
/**
* sync send
* @param host host
* @param command command
*
* @param host host
* @param command command
* @param timeoutMillis timeoutMillis
* @return command
* @throws InterruptedException
......@@ -244,7 +267,7 @@ public class NettyRemotingClient {
channel.writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
if (future.isSuccess()) {
responseFuture.setSendOk(true);
return;
} else {
......@@ -259,10 +282,10 @@ public class NettyRemotingClient {
* sync wait for result
*/
Command result = responseFuture.waitResponse();
if(result == null){
if(responseFuture.isSendOK()){
if (result == null) {
if (responseFuture.isSendOK()) {
throw new RemotingTimeoutException(host.toString(), timeoutMillis, responseFuture.getCause());
} else{
} else {
throw new RemotingException(host.toString(), responseFuture.getCause());
}
}
......@@ -270,8 +293,9 @@ public class NettyRemotingClient {
}
/**
* send task
* @param host host
* send task
*
* @param host host
* @param command command
* @throws RemotingException
*/
......@@ -296,33 +320,35 @@ public class NettyRemotingClient {
}
/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}
/**
* get channel
* get channel
*
* @param host
* @return
*/
public Channel getChannel(Host host) {
Channel channel = channels.get(host);
if(channel != null && channel.isActive()){
if (channel != null && channel.isActive()) {
return channel;
}
return createChannel(host, true);
......@@ -330,17 +356,18 @@ public class NettyRemotingClient {
/**
* create channel
* @param host host
*
* @param host host
* @param isSync sync flag
* @return channel
*/
public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap){
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if(isSync){
if (isSync) {
future.sync();
}
if (future.isSuccess()) {
......@@ -358,16 +385,16 @@ public class NettyRemotingClient {
* close
*/
public void close() {
if(isStarted.compareAndSet(true, false)){
if (isStarted.compareAndSet(true, false)) {
try {
closeChannels();
if(workerGroup != null){
if (workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
if(callbackExecutor != null){
if (callbackExecutor != null) {
this.callbackExecutor.shutdownNow();
}
if(this.responseFutureExecutor != null){
if (this.responseFutureExecutor != null) {
this.responseFutureExecutor.shutdownNow();
}
} catch (Exception ex) {
......@@ -378,9 +405,9 @@ public class NettyRemotingClient {
}
/**
* close channels
* close channels
*/
private void closeChannels(){
private void closeChannels() {
for (Channel channel : this.channels.values()) {
channel.close();
}
......@@ -389,11 +416,12 @@ public class NettyRemotingClient {
/**
* close channel
*
* @param host host
*/
public void closeChannel(Host host){
public void closeChannel(Host host) {
Channel channel = this.channels.remove(host);
if(channel != null){
if (channel != null) {
channel.close();
}
}
......
......@@ -22,9 +22,12 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.dolphinscheduler.remote.codec.NettyDecoder;
import org.apache.dolphinscheduler.remote.codec.NettyEncoder;
import org.apache.dolphinscheduler.remote.command.CommandType;
......@@ -32,6 +35,8 @@ import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.remote.handler.NettyServerHandler;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Constants;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -42,44 +47,44 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* remoting netty server
* remoting netty server
*/
public class NettyRemotingServer {
private final Logger logger = LoggerFactory.getLogger(NettyRemotingServer.class);
/**
* server bootstrap
* server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();
/**
* encoder
* encoder
*/
private final NettyEncoder encoder = new NettyEncoder();
/**
* default executor
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);
/**
* boss group
*/
private final NioEventLoopGroup bossGroup;
private final EventLoopGroup bossGroup;
/**
* worker group
* worker group
*/
private final NioEventLoopGroup workGroup;
private final EventLoopGroup workGroup;
/**
* server config
* server config
*/
private final NettyServerConfig serverConfig;
/**
* server handler
* server handler
*/
private final NettyServerHandler serverHandler = new NettyServerHandler(this);
......@@ -89,59 +94,78 @@ public class NettyRemotingServer {
private final AtomicBoolean isStarted = new AtomicBoolean(false);
/**
* server init
* server init
*
* @param serverConfig server config
*/
public NettyRemotingServer(final NettyServerConfig serverConfig){
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
if (NettyUtils.useEpoll()) {
this.bossGroup = new EpollEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
} else {
this.bossGroup = new NioEventLoopGroup(1, new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerBossThread_%d", this.threadIndex.incrementAndGet()));
}
});
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), new ThreadFactory() {
private AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, String.format("NettyServerWorkerThread_%d", this.threadIndex.incrementAndGet()));
}
});
}
}
/**
* server start
* server start
*/
public void start(){
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
initNettyChannel(ch);
}
});
.group(this.bossGroup, this.workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
initNettyChannel(ch);
}
});
ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
logger.error("NettyRemotingServer bind fail {}, exit",e.getMessage(), e);
logger.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RuntimeException(String.format("NettyRemotingServer bind %s fail", serverConfig.getListenPort()));
}
if (future.isSuccess()) {
......@@ -155,11 +179,12 @@ public class NettyRemotingServer {
}
/**
* init netty channel
* init netty channel
*
* @param ch socket channel
* @throws Exception
*/
private void initNettyChannel(NioSocketChannel ch) throws Exception{
private void initNettyChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", encoder);
pipeline.addLast("decoder", new NettyDecoder());
......@@ -167,27 +192,29 @@ public class NettyRemotingServer {
}
/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param processor processor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor) {
this.registerProcessor(commandType, processor, null);
}
/**
* register processor
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final CommandType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(commandType, processor, executor);
}
/**
* get default thread executor
* get default thread executor
*
* @return thread executor
*/
public ExecutorService getDefaultExecutor() {
......@@ -195,12 +222,12 @@ public class NettyRemotingServer {
}
public void close() {
if(isStarted.compareAndSet(true, false)){
if (isStarted.compareAndSet(true, false)) {
try {
if(bossGroup != null){
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if(workGroup != null){
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
defaultExecutor.shutdown();
......
......@@ -42,4 +42,14 @@ public class Constants {
public static final String LOCAL_ADDRESS = IPUtils.getFirstNoLoopbackIP4Address();
/**
* netty epoll enable switch
*/
public static final String NETTY_EPOLL_ENABLE = System.getProperty("netty.epoll.enable");
/**
* OS Name
*/
public static final String OS_NAME = System.getProperty("os.name");
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.utils;
import io.netty.channel.epoll.Epoll;
/**
* NettyUtils
*/
public class NettyUtils {
private NettyUtils() {
}
public static boolean useEpoll() {
String osName = Constants.OS_NAME;
if (!osName.toLowerCase().contains("linux")) {
return false;
}
if (!Epoll.isAvailable()) {
return false;
}
String enableNettyEpoll = Constants.NETTY_EPOLL_ENABLE;
return Boolean.parseBoolean(enableNettyEpoll);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote;
import org.apache.dolphinscheduler.remote.utils.NettyUtils;
import org.junit.Assert;
import org.junit.Test;
/**
* NettyUtilTest
*/
public class NettyUtilTest {
@Test
public void testUserEpoll() {
System.setProperty("netty.epoll.enable", "false");
Assert.assertFalse(NettyUtils.useEpoll());
}
}
......@@ -804,7 +804,8 @@
<include>**/remote/JsonSerializerTest.java</include>
<include>**/remote/RemoveTaskLogResponseCommandTest.java</include>
<include>**/remote/RemoveTaskLogRequestCommandTest.java</include>
<!--<include>**/remote/NettyRemotingClientTest.java</include>-->
<include>**/remote/NettyRemotingClientTest.java</include>
<include>**/remote/NettyUtilTest.java</include>
<include>**/remote/ResponseFutureTest.java</include>
<include>**/server/log/LoggerServerTest.java</include>
<include>**/server/entity/SQLTaskExecutionContextTest.java</include>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册