From 9701bd0944660078dcc1de9c2820ca9df3f5c2ed Mon Sep 17 00:00:00 2001 From: Daming Date: Thu, 27 May 2021 23:10:22 +0800 Subject: [PATCH] Make the number of core worker in meter converter thread pool configurable (#7027) --- CHANGES.md | 1 + docs/en/setup/backend/configuration-vocabulary.md | 2 ++ .../src/main/resources/application.yml | 2 ++ .../prometheus/provider/PrometheusFetcherConfig.java | 10 ++++++++-- .../prometheus/provider/PrometheusFetcherProvider.java | 6 +++++- 5 files changed, 18 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index ea93875372..6aa69e0766 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -55,6 +55,7 @@ Release Notes. * Support `native-json` format log in kafka-fetcher-plugin. * Fix counter misuse in the alarm core. Alarm can't be triggered in time. * Events can be configured as alarm source. +* Make the number of core worker in meter converter thread pool configurable. #### UI * Add logo for kong plugin. diff --git a/docs/en/setup/backend/configuration-vocabulary.md b/docs/en/setup/backend/configuration-vocabulary.md index e960405b20..f5f25664d8 100644 --- a/docs/en/setup/backend/configuration-vocabulary.md +++ b/docs/en/setup/backend/configuration-vocabulary.md @@ -210,6 +210,8 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode | - | - | maxMessageSize | Sets the maximum message size allowed to be received on the server. Empty means 4 MiB | - | 4M(based on Netty) | | prometheus-fetcher | default | Read [fetcher doc](backend-fetcher.md) for more details | - | - | | - | - | active | Activate the Prometheus fetcher. | SW_PROMETHEUS_FETCHER_ACTIVE | false | +| - | - | enabledRules | Enable rules. | SW_PROMETHEUS_FETCHER_ENABLED_RULES | self | +| - | - | maxConvertWorker | The maximize meter convert worker. | SW_PROMETHEUS_FETCHER_NUM_CONVERT_WORKER | -1(by default, half the number of CPU core(s)) | | kafka-fetcher | default | Read [fetcher doc](backend-fetcher.md) for more details | - | - | | - | - | bootstrapServers | A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | SW_KAFKA_FETCHER_SERVERS | localhost:9092 | | - | - | namespace | namespace aims to isolate multi OAP cluster when using the same Kafka cluster.if you set a namespace for Kafka fetcher, OAP will add a prefix to topic name. you should also set namespace in `agent.config`, the property named| SW_NAMESPACE | - | diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml b/oap-server/server-bootstrap/src/main/resources/application.yml index 4692a2d198..4ac82ea4d4 100755 --- a/oap-server/server-bootstrap/src/main/resources/application.yml +++ b/oap-server/server-bootstrap/src/main/resources/application.yml @@ -337,7 +337,9 @@ envoy-metric: prometheus-fetcher: selector: ${SW_PROMETHEUS_FETCHER:-} default: + active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false} enabledRules: ${SW_PROMETHEUS_FETCHER_ENABLED_RULES:"self"} + maxConvertWorker: ${SW_PROMETHEUS_FETCHER_NUM_CONVERT_WORKER:-1} kafka-fetcher: selector: ${SW_KAFKA_FETCHER:-} diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java index 399b81fbfe..136ac5ee32 100644 --- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java +++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherConfig.java @@ -28,15 +28,21 @@ import org.apache.skywalking.oap.server.library.module.ModuleConfig; @Getter public class PrometheusFetcherConfig extends ModuleConfig { + + private int maxConvertWorker; + private String enabledRules; private final String rulePath = "fetcher-prom-rules"; List getEnabledRules() { - return Arrays.stream(Optional.ofNullable(enabledRules).orElse("").toString() - .split(",")) + return Arrays.stream(Optional.ofNullable(enabledRules).orElse("").split(",")) .map(String::trim) .filter(StringUtil::isNotEmpty) .collect(Collectors.toList()); } + + public int getMaxConvertWorker() { + return maxConvertWorker <= 0 ? Math.max(1, Runtime.getRuntime().availableProcessors() / 2) : maxConvertWorker; + } } diff --git a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java index 8118122557..2549f13909 100644 --- a/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java +++ b/oap-server/server-fetcher-plugin/prometheus-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/fetcher/prometheus/provider/PrometheusFetcherProvider.java @@ -48,6 +48,7 @@ import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ModuleStartException; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; +import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory; import org.apache.skywalking.oap.server.library.util.prometheus.Parser; import org.apache.skywalking.oap.server.library.util.prometheus.Parsers; import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Metric; @@ -93,7 +94,10 @@ public class PrometheusFetcherProvider extends ModuleProvider { @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException { rules = Rules.loadRules(config.getRulePath(), config.getEnabledRules()); - ses = Executors.newScheduledThreadPool(rules.size(), Executors.defaultThreadFactory()); + ses = Executors.newScheduledThreadPool( + Math.min(rules.size(), config.getMaxConvertWorker()), + new CustomThreadFactory("meter-converter") + ); } @Override -- GitLab