未验证 提交 ac36a3ff 编写于 作者: Z zifeihan 提交者: GitHub

Add ThreadPoolExecutor for handle kafka message. (#5718)

* 1.Add ThreadPoolExecutor for handle kafka message.
Co-authored-by: NDaming <zteny@foxmail.com>
Co-authored-by: wu-sheng's avatar吴晟 Wu Sheng <wu.sheng@foxmail.com>
上级 a2dd0e13
......@@ -16,6 +16,7 @@ Release Notes.
* Support keeping collecting the slowly segments in the sampling mechanism.
* Support choose files to active the meter analyzer.
* Improve Kubernetes service registry for ALS analysis.
* Add the thread pool to the Kafka fetcher to increase the performance.
#### UI
......
......@@ -8,12 +8,12 @@ prometheus-fetcher:
selector: ${SW_PROMETHEUS_FETCHER:default}
default:
active: ${SW_PROMETHEUS_FETCHER_ACTIVE:false}
```
```
### Configuration file
Prometheus fetcher is configured via a configuration file. The configuration file defines everything related to fetching
services and their instances, as well as which rule files to load.
OAP can load the configuration at bootstrap. If the new configuration is not well-formed, OAP fails to start up. The files
are located at `$CLASSPATH/fetcher-prom-rules`.
......@@ -23,7 +23,7 @@ A full example can be found [here](../../../../oap-server/server-bootstrap/src/m
Generic placeholders are defined as follows:
* `<duration>`: a duration This will parse a textual representation of a duration. The formats accepted are based on
* `<duration>`: a duration This will parse a textual representation of a duration. The formats accepted are based on
the ISO-8601 duration format `PnDTnHnMn.nS` with days considered to be exactly 24 hours.
* `<labelname>`: a string matching the regular expression \[a-zA-Z_\]\[a-zA-Z0-9_\]*
* `<labelvalue>`: a string of unicode characters
......@@ -33,7 +33,7 @@ Generic placeholders are defined as follows:
```yaml
# How frequently to fetch targets.
fetcherInterval: <duration>
fetcherInterval: <duration>
# Per-fetch timeout when fetching this target.
fetcherTimeout: <duration>
# The HTTP resource path on which to fetch metrics from targets.
......
......@@ -201,8 +201,10 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | isSharding | it was true when OAP Server in cluster. | SW_KAFKA_FETCHER_IS_SHARDING | false |
| - | - | createTopicIfNotExist | If true, create the Kafka topic when it does not exist. | - | true |
| - | - | partitions | The number of partitions for the topic being created. | SW_KAFKA_FETCHER_PARTITIONS | 3 |
| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false
| - | - | enableMeterSystem | To enable to fetch and handle [Meter System](backend-meter.md) data. | SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM | false |
| - | - | replicationFactor | The replication factor for each partition in the topic being created. | SW_KAFKA_FETCHER_PARTITIONS_FACTOR | 2 |
| - | - | kafkaHandlerThreadPoolSize | Pool size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_SIZE | CPU core * 2 |
| - | - | kafkaHandlerThreadPoolQueueSize | The queue size of kafka message handler executor. | SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE | 10000 |
| - | - | topicNameOfMeters | Specifying Kafka topic name for Meter system data. | - | skywalking-meters |
| - | - | topicNameOfMetrics | Specifying Kafka topic name for JVM Metrics data. | - | skywalking-metrics |
| - | - | topicNameOfProfiling | Specifying Kafka topic name for Profiling data. | - | skywalking-profilings |
......
......@@ -71,8 +71,8 @@ core:
gRPCPort: ${SW_CORE_GRPC_PORT:11800}
maxConcurrentCallsPerConnection: ${SW_CORE_GRPC_MAX_CONCURRENT_CALL:0}
maxMessageSize: ${SW_CORE_GRPC_MAX_MESSAGE_SIZE:0}
gRPCThreadPoolQueueSize: ${SW_CORE_GRPC_POOL_QUEUE_SIZE:0}
gRPCThreadPoolSize: ${SW_CORE_GRPC_THREAD_POOL_SIZE:0}
gRPCThreadPoolQueueSize: ${SW_CORE_GRPC_POOL_QUEUE_SIZE:-1}
gRPCThreadPoolSize: ${SW_CORE_GRPC_THREAD_POOL_SIZE:-1}
gRPCSslEnabled: ${SW_CORE_GRPC_SSL_ENABLED:false}
gRPCSslKeyPath: ${SW_CORE_GRPC_SSL_KEY_PATH:""}
gRPCSslCertChainPath: ${SW_CORE_GRPC_SSL_CERT_CHAIN_PATH:""}
......@@ -269,6 +269,8 @@ kafka-fetcher:
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:false}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:""}
kafkaHandlerThreadPoolSize: ${SW_KAFKA_HANDLER_THREAD_POOL_SIZE:-1}
kafkaHandlerThreadPoolQueueSize: ${SW_KAFKA_HANDLER_THREAD_POOL_QUEUE_SIZE:-1}
receiver-meter:
selector: ${SW_RECEIVER_METER:default}
......
......@@ -20,15 +20,16 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
......@@ -42,9 +43,10 @@ import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler.KafkaHandler;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.analyzer.agent.kafka.module.KafkaFetcherConfig;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
/**
* Configuring and initializing a KafkaConsumer client as a dispatcher to delivery Kafka Message to registered handler by topic.
......@@ -60,8 +62,14 @@ public class KafkaFetcherHandlerRegister implements Runnable {
private final KafkaFetcherConfig config;
private final boolean isSharding;
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
private int threadPoolQueueSize = 10000;
private final ThreadPoolExecutor executor;
private final boolean enableKafkaMessageAutoCommit;
public KafkaFetcherHandlerRegister(KafkaFetcherConfig config) throws ModuleStartException {
this.config = config;
Properties properties = new Properties();
properties.putAll(config.getKafkaConsumerConfig());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
......@@ -92,11 +100,11 @@ public class KafkaFetcherHandlerRegister implements Runnable {
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
......@@ -110,7 +118,22 @@ public class KafkaFetcherHandlerRegister implements Runnable {
} else {
isSharding = false;
}
if (config.getKafkaHandlerThreadPoolSize() > 0) {
threadPoolSize = config.getKafkaHandlerThreadPoolSize();
}
if (config.getKafkaHandlerThreadPoolQueueSize() > 0) {
threadPoolQueueSize = config.getKafkaHandlerThreadPoolQueueSize();
}
enableKafkaMessageAutoCommit = (boolean) properties.getOrDefault(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize,
60, TimeUnit.SECONDS,
new ArrayBlockingQueue(threadPoolQueueSize),
new CustomThreadFactory("KafkaConsumer"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
public void register(KafkaHandler handler) {
......@@ -126,22 +149,27 @@ public class KafkaFetcherHandlerRegister implements Runnable {
consumer.subscribe(handlerMap.keySet());
}
consumer.seekToEnd(consumer.assignment());
Executors.newSingleThreadExecutor(new DefaultThreadFactory("KafkaConsumer")).submit(this);
executor.submit(this);
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, Bytes> record = iterator.next();
handlerMap.get(record.topic()).handle(record);
try {
ConsumerRecords<String, Bytes> consumerRecords = consumer.poll(Duration.ofMillis(500L));
if (!consumerRecords.isEmpty()) {
Iterator<ConsumerRecord<String, Bytes>> iterator = consumerRecords.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, Bytes> record = iterator.next();
executor.submit(() -> handlerMap.get(record.topic()).handle(record));
}
if (!enableKafkaMessageAutoCommit) {
consumer.commitAsync();
}
}
consumer.commitAsync();
} catch (Exception e) {
log.error("Kafka handle message error.", e);
}
}
}
}
......@@ -80,4 +80,9 @@ public class KafkaFetcherConfig extends ModuleConfig {
private String topicNameOfManagements = "skywalking-managements";
private String topicNameOfMeters = "skywalking-meters";
private int kafkaHandlerThreadPoolSize;
private int kafkaHandlerThreadPoolQueueSize;
}
......@@ -36,6 +36,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.ssl.DynamicSslContext;
import org.apache.skywalking.oap.server.library.server.pool.CustomThreadFactory;
@Slf4j
public class GRPCServer implements Server {
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.library.server.grpc;
package org.apache.skywalking.oap.server.library.server.pool;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
......@@ -27,7 +27,7 @@ public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
CustomThreadFactory(String name) {
public CustomThreadFactory(String name) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name + "-" + poolNumber.getAndIncrement() + "-thread-";
......
......@@ -51,7 +51,7 @@
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-micrometer-registry</artifactId>
<version>8.2.0-SNAPSHOT</version>
<version>8.2.0</version>
</dependency>
</dependencies>
......@@ -76,12 +76,4 @@
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>apache-snapshot</id>
<name>Apache Snapshot</name>
<url>https://repository.apache.org/content/repositories/snapshots</url>
</repository>
</repositories>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册