From 1bfda1dd65f275ece62d180d5f45970b62013248 Mon Sep 17 00:00:00 2001 From: yantaowu Date: Fri, 9 Aug 2019 09:14:55 +0800 Subject: [PATCH] Modify the thread pool of grpc server. (#3225) * Modify the thread pool of grpc server. * add license header * make param confingable * Add sharing and Jaeger config. * Remove grpc thread config from yml --- .../oap/server/core/CoreModuleConfig.java | 2 + .../oap/server/core/CoreModuleProvider.java | 6 +++ .../server/grpc/CustomThreadFactory.java | 52 +++++++++++++++++++ .../library/server/grpc/GRPCServer.java | 29 +++++++++-- .../receiver/jaeger/JaegerReceiverConfig.java | 2 + .../jaeger/JaegerReceiverProvider.java | 6 +++ .../sharing/server/SharingServerConfig.java | 4 +- .../server/SharingServerModuleProvider.java | 6 +++ 8 files changed, 103 insertions(+), 4 deletions(-) create mode 100644 oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index e47171a3d3..75050115c5 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -50,6 +50,8 @@ public class CoreModuleConfig extends ModuleConfig { @Setter private int hourMetricsDataTTL; @Setter private int dayMetricsDataTTL; @Setter private int monthMetricsDataTTL; + @Setter private int gRPCThreadPoolSize; + @Setter private int gRPCThreadPoolQueueSize; CoreModuleConfig() { this.downsampling = new ArrayList<>(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index debb71687f..6237f68098 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -111,6 +111,12 @@ public class CoreModuleProvider extends ModuleProvider { if (moduleConfig.getMaxMessageSize() > 0) { grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize()); } + if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize()); + } + if (moduleConfig.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); + } grpcServer.initialize(); jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors()); diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java new file mode 100644 index 0000000000..19087148a3 --- /dev/null +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.server.grpc; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author yantaowu + */ +public class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + CustomThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} \ No newline at end of file diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java index 3d5eebab79..8ae2fc1ba0 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java @@ -24,12 +24,14 @@ import io.netty.handler.ssl.*; import java.io.*; import java.net.InetSocketAddress; import java.util.Objects; +import java.util.concurrent.*; + import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.*; import org.slf4j.*; /** - * @author peng-yongsheng, wusheng + * @author peng-yongsheng, wusheng, yantaowu */ public class GRPCServer implements Server { @@ -44,6 +46,8 @@ public class GRPCServer implements Server { private SslContextBuilder sslContextBuilder; private File certChainFile; private File privateKeyFile; + private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4; + private int threadPoolQueueSize = 10000; public GRPCServer(String host, int port) { this.host = host; @@ -60,6 +64,15 @@ public class GRPCServer implements Server { this.maxMessageSize = maxMessageSize; } + public void setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public void setThreadPoolQueueSize(int threadPoolQueueSize) { + this.threadPoolQueueSize = threadPoolQueueSize; + } + + /** * Require for `server.crt` and `server.pem` for open ssl at server side. * @@ -90,12 +103,22 @@ public class GRPCServer implements Server { @Override public void initialize() { InetSocketAddress address = new InetSocketAddress(host, port); - + ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize); + ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, + TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()); nettyServerBuilder = NettyServerBuilder.forAddress(address); - nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize); + nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize).executor(executor); logger.info("Server started, host {} listening on {}", host, port); } + static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + logger.warn("Grpc server thread pool is full, rejecting the task"); + } + } + @Override public void start() throws ServerException { try { diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java index 780ea097f0..91df23f2cd 100644 --- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java +++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java @@ -29,4 +29,6 @@ public class JaegerReceiverConfig extends ModuleConfig { private int maxConcurrentCallsPerConnection; private int maxMessageSize; private boolean registerJaegerEndpoint = true; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; } diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java index 69ac9dfdef..3a4e084d1e 100644 --- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java +++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java @@ -58,6 +58,12 @@ public class JaegerReceiverProvider extends ModuleProvider { if (config.getMaxConcurrentCallsPerConnection() > 0) { grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection()); } + if (config.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize()); + } + if (config.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); + } grpcServer.initialize(); } } diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java index 7e4cb5f867..2f0b0052bf 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java @@ -22,7 +22,7 @@ import lombok.*; import org.apache.skywalking.oap.server.library.module.ModuleConfig; /** - * @author peng-yongsheng + * @author peng-yongsheng,yantaowu */ @Getter @Setter @@ -34,4 +34,6 @@ public class SharingServerConfig extends ModuleConfig { private int gRPCPort; private int maxConcurrentCallsPerConnection; private int maxMessageSize; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; } diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java index adc429afb3..a4ae401a70 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java @@ -75,6 +75,12 @@ public class SharingServerModuleProvider extends ModuleProvider { if (config.getMaxConcurrentCallsPerConnection() > 0) { grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection()); } + if (config.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize()); + } + if (config.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); + } grpcServer.initialize(); this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer)); -- GitLab