提交 1bfda1dd 编写于 作者: Y yantaowu 提交者: wu-sheng

Modify the thread pool of grpc server. (#3225)

* Modify the thread pool of grpc server.

* add license header

* make param confingable

* Add sharing and Jaeger config.

* Remove grpc thread config from yml
上级 ffd25332
......@@ -50,6 +50,8 @@ public class CoreModuleConfig extends ModuleConfig {
@Setter private int hourMetricsDataTTL;
@Setter private int dayMetricsDataTTL;
@Setter private int monthMetricsDataTTL;
@Setter private int gRPCThreadPoolSize;
@Setter private int gRPCThreadPoolQueueSize;
CoreModuleConfig() {
this.downsampling = new ArrayList<>();
......
......@@ -111,6 +111,12 @@ public class CoreModuleProvider extends ModuleProvider {
if (moduleConfig.getMaxMessageSize() > 0) {
grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
}
if (moduleConfig.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(moduleConfig.getGRPCThreadPoolQueueSize());
}
if (moduleConfig.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(moduleConfig.getGRPCThreadPoolSize());
}
grpcServer.initialize();
jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath(), moduleConfig.getJettySelectors());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.library.server.grpc;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author yantaowu
*/
public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = name + "-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
\ No newline at end of file
......@@ -24,12 +24,14 @@ import io.netty.handler.ssl.*;
import java.io.*;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.*;
import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.*;
import org.slf4j.*;
/**
* @author peng-yongsheng, wusheng
* @author peng-yongsheng, wusheng, yantaowu
*/
public class GRPCServer implements Server {
......@@ -44,6 +46,8 @@ public class GRPCServer implements Server {
private SslContextBuilder sslContextBuilder;
private File certChainFile;
private File privateKeyFile;
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 4;
private int threadPoolQueueSize = 10000;
public GRPCServer(String host, int port) {
this.host = host;
......@@ -60,6 +64,15 @@ public class GRPCServer implements Server {
this.maxMessageSize = maxMessageSize;
}
public void setThreadPoolSize(int threadPoolSize) {
this.threadPoolSize = threadPoolSize;
}
public void setThreadPoolQueueSize(int threadPoolQueueSize) {
this.threadPoolQueueSize = threadPoolQueueSize;
}
/**
* Require for `server.crt` and `server.pem` for open ssl at server side.
*
......@@ -90,12 +103,22 @@ public class GRPCServer implements Server {
@Override
public void initialize() {
InetSocketAddress address = new InetSocketAddress(host, port);
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize);
ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60,
TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler());
nettyServerBuilder = NettyServerBuilder.forAddress(address);
nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize);
nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize).executor(executor);
logger.info("Server started, host {} listening on {}", host, port);
}
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
logger.warn("Grpc server thread pool is full, rejecting the task");
}
}
@Override
public void start() throws ServerException {
try {
......
......@@ -29,4 +29,6 @@ public class JaegerReceiverConfig extends ModuleConfig {
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private boolean registerJaegerEndpoint = true;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
}
......@@ -58,6 +58,12 @@ public class JaegerReceiverProvider extends ModuleProvider {
if (config.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection());
}
if (config.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize());
}
if (config.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize());
}
grpcServer.initialize();
}
}
......
......@@ -22,7 +22,7 @@ import lombok.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
/**
* @author peng-yongsheng
* @author peng-yongsheng,yantaowu
*/
@Getter
@Setter
......@@ -34,4 +34,6 @@ public class SharingServerConfig extends ModuleConfig {
private int gRPCPort;
private int maxConcurrentCallsPerConnection;
private int maxMessageSize;
private int gRPCThreadPoolSize;
private int gRPCThreadPoolQueueSize;
}
......@@ -75,6 +75,12 @@ public class SharingServerModuleProvider extends ModuleProvider {
if (config.getMaxConcurrentCallsPerConnection() > 0) {
grpcServer.setMaxConcurrentCallsPerConnection(config.getMaxConcurrentCallsPerConnection());
}
if (config.getGRPCThreadPoolQueueSize() > 0) {
grpcServer.setThreadPoolQueueSize(config.getGRPCThreadPoolQueueSize());
}
if (config.getGRPCThreadPoolSize() > 0) {
grpcServer.setThreadPoolSize(config.getGRPCThreadPoolSize());
}
grpcServer.initialize();
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册