diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java index e47171a3d3bfbe9a5f5eb2521843a8820d199be7..75050115c545c4367579cc5e74d09be62972dbb4 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java @@ -50,6 +50,8 @@ public class CoreModuleConfig extends ModuleConfig { @Setter private int hourMetricsDataTTL; @Setter private int dayMetricsDataTTL; @Setter private int monthMetricsDataTTL; + @Setter private int gRPCThreadPoolSize; + @Setter private int gRPCThreadPoolQueueSize; CoreModuleConfig() { this.downsampling = new ArrayList<>(); diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index debb71687fc77aa107ea1f737a4a35f1f131cf17..6237f68098f3cdc12446c06313186ac317907f80 100755 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -111,6 +111,12 @@ public class CoreModuleProvider extends ModuleProvider { if (moduleConfig.getMaxMessageSize() > 0) { grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize()); } + if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize()); + } + if (moduleConfig.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize()); + } grpcServer.initialize(); jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors()); diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java new file mode 100644 index 0000000000000000000000000000000000000000..19087148a3e1d742f238506e4abf796e77d5deac --- /dev/null +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/CustomThreadFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.library.server.grpc; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author yantaowu + */ +public class CustomThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + CustomThreadFactory(String name) { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : + Thread.currentThread().getThreadGroup(); + namePrefix = name + "-" + + poolNumber.getAndIncrement() + + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + threadNumber.getAndIncrement(), + 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} \ No newline at end of file diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java index 3d5eebab79d1dc11f39737c9acf0a5cffb09db63..8ae2fc1ba08572966426e4144b1464b4905a01c3 100644 --- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java +++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java @@ -24,12 +24,14 @@ import io.netty.handler.ssl.*; import java.io.*; import java.net.InetSocketAddress; import java.util.Objects; +import java.util.concurrent.*; + import org.apache.skywalking.oap.server.library.server.Server; import org.apache.skywalking.oap.server.library.server.*; import org.slf4j.*; /** - * @author peng-yongsheng, wusheng + * @author peng-yongsheng, wusheng, yantaowu */ public class GRPCServer implements Server { @@ -44,6 +46,8 @@ public class GRPCServer implements Server { private SslContextBuilder sslContextBuilder; private File certChainFile; private File privateKeyFile; + private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4; + private int threadPoolQueueSize = 10000; public GRPCServer(String host, int port) { this.host = host; @@ -60,6 +64,15 @@ public class GRPCServer implements Server { this.maxMessageSize = maxMessageSize; } + public void setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + } + + public void setThreadPoolQueueSize(int threadPoolQueueSize) { + this.threadPoolQueueSize = threadPoolQueueSize; + } + + /** * Require for `server.crt` and `server.pem` for open ssl at server side. * @@ -90,12 +103,22 @@ public class GRPCServer implements Server { @Override public void initialize() { InetSocketAddress address = new InetSocketAddress(host, port); - + ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize); + ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, + TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()); nettyServerBuilder = NettyServerBuilder.forAddress(address); - nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize); + nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize).executor(executor); logger.info("Server started, host {} listening on {}", host, port); } + static class CustomRejectedExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + logger.warn("Grpc server thread pool is full, rejecting the task"); + } + } + @Override public void start() throws ServerException { try { diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java index 780ea097f003cabbae3ff8605aa96385957e9a02..91df23f2cd1726b370945b7dca0011d9e36b2d16 100644 --- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java +++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverConfig.java @@ -29,4 +29,6 @@ public class JaegerReceiverConfig extends ModuleConfig { private int maxConcurrentCallsPerConnection; private int maxMessageSize; private boolean registerJaegerEndpoint = true; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; } diff --git a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java index 69ac9dfdef90822d2c621cf5787c2e97fb368dab..3a4e084d1e54d43f18350f7fd744837592801e97 100644 --- a/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java +++ b/oap-server/server-receiver-plugin/jaeger-receiver-plugin/src/main/java/org/apache/skywalking/aop/server/receiver/jaeger/JaegerReceiverProvider.java @@ -58,6 +58,12 @@ public class JaegerReceiverProvider extends ModuleProvider { if (config.getMaxConcurrentCallsPerConnection() > 0) { grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection()); } + if (config.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize()); + } + if (config.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); + } grpcServer.initialize(); } } diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java index 7e4cb5f8673762ae9c87029e460af51f68a349b5..2f0b0052bfed98c9f499393bb8e9686d469f93b0 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerConfig.java @@ -22,7 +22,7 @@ import lombok.*; import org.apache.skywalking.oap.server.library.module.ModuleConfig; /** - * @author peng-yongsheng + * @author peng-yongsheng,yantaowu */ @Getter @Setter @@ -34,4 +34,6 @@ public class SharingServerConfig extends ModuleConfig { private int gRPCPort; private int maxConcurrentCallsPerConnection; private int maxMessageSize; + private int gRPCThreadPoolSize; + private int gRPCThreadPoolQueueSize; } diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java index adc429afb3b7db1b8accd2d13a5442ca3c30684d..a4ae401a70d0e531c9846d519bceea2a1366544f 100644 --- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java @@ -75,6 +75,12 @@ public class SharingServerModuleProvider extends ModuleProvider { if (config.getMaxConcurrentCallsPerConnection() > 0) { grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection()); } + if (config.getGRPCThreadPoolQueueSize() > 0) { + grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize()); + } + if (config.getGRPCThreadPoolSize() > 0) { + grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize()); + } grpcServer.initialize(); this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));