未验证 提交 a2e011f9 编写于 作者: D Daming 提交者: GitHub

Kafka extra producer configuration enhancement (#7526)

上级 7b206458
......@@ -17,6 +17,7 @@ Release Notes.
* Modify `Tags.STATUS_CODE` field name to `Tags.HTTP_RESPONSE_STATUS_CODE` and type from `StringTag` to `IntegerTag`, add `Tags.RPC_RESPONSE_STATUS_CODE` field to hold rpc response code value.
* Fix kafka-reporter-plugin shade package conflict
* Add all config items to `agent.conf` file for convenient containerization use cases.
* Advanced Kafka Producer configuration enhancement.
#### OAP-Backend
......
......@@ -216,12 +216,12 @@ plugin.dubbo.consumer_provider_length_threshold=${SW_PLUGIN_DUBBO_CONSUMER_PROVI
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
# Timeout period of reading topics from the Kafka server, the unit is second.
plugin.kafka.get_topic_timeout=${SW_GET_TOPIC_TIMEOUT:10}
# Kafka producer configuration.
plugin.kafka.consumer_config=${SW_PLUGIN_KAFKA_CONSUMER_CONFIG:}
# Kafka producer configuration. Read [producer configure](http://kafka.apache.org/24/documentation.html#producerconfigs)
# to get more details. Check [Kafka report doc](How-to-enable-kafka-reporter.md) for more details and examples.
plugin.kafka.producer_config=${SW_PLUGIN_KAFKA_PRODUCER_CONFIG:}
# Specify which Kafka topic name for Meter System data to report to.
plugin.kafka.producer_config=${sw_plugin_kafka_producer_config:}
# Configure Kafka Producer configuration in JSON format. Notice it will be overridden by plugin.kafka.producer_config[key], if the key duplication.
plugin.kafka.producer_config_json=${SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON:}
# Specify which Kafka topic name for Meter System data to report to.
plugin.kafka.topic_meter=${SW_PLUGIN_KAFKA_TOPIC_METER:skywalking-meters}
# Specify which Kafka topic name for JVM metrics data to report to.
plugin.kafka.topic_metrics=${SW_PLUGIN_KAFKA_TOPIC_METRICS:skywalking-metrics}
......
......@@ -18,9 +18,11 @@
package org.apache.skywalking.apm.agent.core.kafka;
import com.google.gson.Gson;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
......@@ -42,6 +44,7 @@ import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig.Plugin.Kafka;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader;
......@@ -57,8 +60,8 @@ public class KafkaProducerManager implements BootService, Runnable {
private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class);
private Set<String> topics = new HashSet<>();
private List<KafkaConnectionStatusListener> listeners = new ArrayList<>();
private final Set<String> topics = new HashSet<>();
private final List<KafkaConnectionStatusListener> listeners = new ArrayList<>();
private volatile KafkaProducer<String, Bytes> producer;
......@@ -79,8 +82,8 @@ public class KafkaProducerManager implements BootService, Runnable {
}
String formatTopicNameThenRegister(String topic) {
String topicName = StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic
: KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + topic;
String topicName = StringUtil.isBlank(Kafka.NAMESPACE) ? topic
: Kafka.NAMESPACE + "-" + topic;
topics.add(topicName);
return topicName;
}
......@@ -100,9 +103,14 @@ public class KafkaProducerManager implements BootService, Runnable {
Thread.currentThread().setContextClassLoader(AgentClassLoader.getDefault());
Properties properties = new Properties();
properties.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);
if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
Gson gson = new Gson();
Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
config.forEach(properties::setProperty);
}
Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
try (AdminClient adminClient = AdminClient.create(properties)) {
DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
......@@ -110,7 +118,7 @@ public class KafkaProducerManager implements BootService, Runnable {
.map(entry -> {
try {
entry.getValue().get(
KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT,
Kafka.GET_TOPIC_TIMEOUT,
TimeUnit.SECONDS
);
return null;
......@@ -130,7 +138,7 @@ public class KafkaProducerManager implements BootService, Runnable {
try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS);
LOGGER.error(e, "connect to kafka cluster '{}' failed", Kafka.BOOTSTRAP_SERVERS);
return;
}
//notify listeners to send data if no exception been throw
......@@ -147,6 +155,7 @@ public class KafkaProducerManager implements BootService, Runnable {
/**
* Get the KafkaProducer instance to send data to Kafka broker.
* @return Kafka producer
*/
public final KafkaProducer<String, Bytes> getProducer() {
return producer;
......
......@@ -27,7 +27,7 @@ public class KafkaReporterPluginConfig {
@PluginConfig(root = KafkaReporterPluginConfig.class)
public static class Kafka {
/**
* <B>bootstrap.servers</B>: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* <B>bootstrap_servers</B>: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
* This list should be in the form host1:port1,host2:port2,...
*/
public static String BOOTSTRAP_SERVERS = "localhost:9092";
......@@ -44,8 +44,19 @@ public class KafkaReporterPluginConfig {
public static String NAMESPACE = "";
/**
* <B>producer_config</B>: the configuration of Kafka Producer. The format is:
* <code>plugin.kafka.producer_config[key] = value</code>
*/
public static Map<String, String> PRODUCER_CONFIG = new HashMap<>();
/**
* Configure Kafka Producer configuration in JSON format. Notice it will be overridden by
* `plugin.kafka.producer_config[key]`, if the key duplication. For example:
* <code>plugin.kafka.producer_config_json = {"batch.size":32768}</code>
*/
public static String PRODUCER_CONFIG_JSON = "";
/**
* Timeout period of reading topics from the Kafka server, the unit is second.
*/
......
......@@ -12,10 +12,21 @@ collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
# Kafka producer configuration
plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092}
plugin.kafka.producer_config[delivery.timeout.ms]=12000
plugin.kafka.get_topic_timeout=${SW_GET_TOPIC_TIMEOUT:10}
```
Kafka reporter plugin support to customize all configurations of listed in [here](http://kafka.apache.org/24/documentation.html#producerconfigs).
Before you activated the Kafka reporter, you have to make sure that [Kafka fetcher](../../backend/backend-fetcher.md#kafka-fetcher) has been opened in service.
### Advanced Kafka Producer Configurations
Kafka reporter plugin support to customize all configurations of listed in [here](http://kafka.apache.org/24/documentation.html#producerconfigs). For example:
```properties
plugin.kafka.producer_config[delivery.timeout.ms]=12000
```
Since SkyWalking 8.8.0, support to configure advanced Producer configurations in JSON format, like this:
```properties
plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.type": "snappy"}
```
Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.
......@@ -153,8 +153,8 @@ property key | Description | Default |
`plugin.dubbo.provider_arguments_length_threshold`| When `plugin.dubbo.collect_provider_arguments` is `true`, Arguments of length from the front will to the OAP backend |`256`|
`plugin.kafka.bootstrap_servers`| A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092`|
`plugin.kafka.get_topic_timeout`| Timeout period of reading topics from the Kafka server, the unit is second. |`10`|
`plugin.kafka.consumer_config`| Kafka producer configuration. ||
`plugin.kafka.producer_config`| Kafka producer configuration. Read [producer configure](http://kafka.apache.org/24/documentation.html#producerconfigs) to get more details. Check [Kafka report doc](How-to-enable-kafka-reporter.md) for more details and examples. | |
`plugin.kafka.producer_config_json` | Configure Kafka Producer configuration in JSON format. Notice it will be overridden by `plugin.kafka.producer_config[key]`, if the key duplication. | |
`plugin.kafka.topic_meter` | Specify which Kafka topic name for Meter System data to report to. | `skywalking-meters` |
`plugin.kafka.topic_metrics` | Specify which Kafka topic name for JVM metrics data to report to. | `skywalking-metrics` |
`plugin.kafka.topic_segment` | Specify which Kafka topic name for traces data to report to. | `skywalking-segments` |
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册