From a2e011f9b06fe39bf16b8e6eda96ec170292db2c Mon Sep 17 00:00:00 2001 From: Daming Date: Mon, 23 Aug 2021 17:16:19 +0800 Subject: [PATCH] Kafka extra producer configuration enhancement (#7526) --- CHANGES.md | 1 + apm-sniffer/config/agent.config | 8 +++--- .../core/kafka/KafkaProducerManager.java | 27 ++++++++++++------- .../core/kafka/KafkaReporterPluginConfig.java | 13 ++++++++- .../How-to-enable-kafka-reporter.md | 17 +++++++++--- .../setup/service-agent/java-agent/README.md | 2 +- 6 files changed, 50 insertions(+), 18 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index dbe24b893b..3724d8ad06 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -17,6 +17,7 @@ Release Notes. * Modify `Tags.STATUS_CODE` field name to `Tags.HTTP_RESPONSE_STATUS_CODE` and type from `StringTag` to `IntegerTag`, add `Tags.RPC_RESPONSE_STATUS_CODE` field to hold rpc response code value. * Fix kafka-reporter-plugin shade package conflict * Add all config items to `agent.conf` file for convenient containerization use cases. +* Advanced Kafka Producer configuration enhancement. #### OAP-Backend diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config index 823a811f27..3cae8fa40e 100644 --- a/apm-sniffer/config/agent.config +++ b/apm-sniffer/config/agent.config @@ -216,12 +216,12 @@ plugin.dubbo.consumer_provider_length_threshold=${SW_PLUGIN_DUBBO_CONSUMER_PROVI plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} # Timeout period of reading topics from the Kafka server, the unit is second. plugin.kafka.get_topic_timeout=${SW_GET_TOPIC_TIMEOUT:10} -# Kafka producer configuration. -plugin.kafka.consumer_config=${SW_PLUGIN_KAFKA_CONSUMER_CONFIG:} # Kafka producer configuration. Read [producer configure](http://kafka.apache.org/24/documentation.html#producerconfigs) # to get more details. Check [Kafka report doc](How-to-enable-kafka-reporter.md) for more details and examples. -plugin.kafka.producer_config=${SW_PLUGIN_KAFKA_PRODUCER_CONFIG:} -# Specify which Kafka topic name for Meter System data to report to. +plugin.kafka.producer_config=${sw_plugin_kafka_producer_config:} +# Configure Kafka Producer configuration in JSON format. Notice it will be overridden by plugin.kafka.producer_config[key], if the key duplication. +plugin.kafka.producer_config_json=${SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON:} +# Specify which Kafka topic name for Meter System data to report to. plugin.kafka.topic_meter=${SW_PLUGIN_KAFKA_TOPIC_METER:skywalking-meters} # Specify which Kafka topic name for JVM metrics data to report to. plugin.kafka.topic_metrics=${SW_PLUGIN_KAFKA_TOPIC_METRICS:skywalking-metrics} diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java index a3c52bb25e..9bf52f94da 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java @@ -18,9 +18,11 @@ package org.apache.skywalking.apm.agent.core.kafka; +import com.google.gson.Gson; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Properties; import java.util.Set; @@ -42,6 +44,7 @@ import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.kafka.KafkaReporterPluginConfig.Plugin.Kafka; import org.apache.skywalking.apm.agent.core.logging.api.ILog; import org.apache.skywalking.apm.agent.core.logging.api.LogManager; import org.apache.skywalking.apm.agent.core.plugin.loader.AgentClassLoader; @@ -57,8 +60,8 @@ public class KafkaProducerManager implements BootService, Runnable { private static final ILog LOGGER = LogManager.getLogger(KafkaProducerManager.class); - private Set topics = new HashSet<>(); - private List listeners = new ArrayList<>(); + private final Set topics = new HashSet<>(); + private final List listeners = new ArrayList<>(); private volatile KafkaProducer producer; @@ -79,8 +82,8 @@ public class KafkaProducerManager implements BootService, Runnable { } String formatTopicNameThenRegister(String topic) { - String topicName = StringUtil.isBlank(KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE) ? topic - : KafkaReporterPluginConfig.Plugin.Kafka.NAMESPACE + "-" + topic; + String topicName = StringUtil.isBlank(Kafka.NAMESPACE) ? topic + : Kafka.NAMESPACE + "-" + topic; topics.add(topicName); return topicName; } @@ -100,9 +103,14 @@ public class KafkaProducerManager implements BootService, Runnable { Thread.currentThread().setContextClassLoader(AgentClassLoader.getDefault()); Properties properties = new Properties(); - properties.setProperty( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); - KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG.forEach(properties::setProperty); + properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS); + + if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) { + Gson gson = new Gson(); + Map config = (Map) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class); + config.forEach(properties::setProperty); + } + Kafka.PRODUCER_CONFIG.forEach(properties::setProperty); try (AdminClient adminClient = AdminClient.create(properties)) { DescribeTopicsResult topicsResult = adminClient.describeTopics(topics); @@ -110,7 +118,7 @@ public class KafkaProducerManager implements BootService, Runnable { .map(entry -> { try { entry.getValue().get( - KafkaReporterPluginConfig.Plugin.Kafka.GET_TOPIC_TIMEOUT, + Kafka.GET_TOPIC_TIMEOUT, TimeUnit.SECONDS ); return null; @@ -130,7 +138,7 @@ public class KafkaProducerManager implements BootService, Runnable { try { producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer()); } catch (Exception e) { - LOGGER.error(e, "connect to kafka cluster '{}' failed", KafkaReporterPluginConfig.Plugin.Kafka.BOOTSTRAP_SERVERS); + LOGGER.error(e, "connect to kafka cluster '{}' failed", Kafka.BOOTSTRAP_SERVERS); return; } //notify listeners to send data if no exception been throw @@ -147,6 +155,7 @@ public class KafkaProducerManager implements BootService, Runnable { /** * Get the KafkaProducer instance to send data to Kafka broker. + * @return Kafka producer */ public final KafkaProducer getProducer() { return producer; diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java index 5b1d94e0cb..1b4d582f05 100644 --- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java +++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java @@ -27,7 +27,7 @@ public class KafkaReporterPluginConfig { @PluginConfig(root = KafkaReporterPluginConfig.class) public static class Kafka { /** - * bootstrap.servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. + * bootstrap_servers: A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. * This list should be in the form host1:port1,host2:port2,... */ public static String BOOTSTRAP_SERVERS = "localhost:9092"; @@ -44,8 +44,19 @@ public class KafkaReporterPluginConfig { public static String NAMESPACE = ""; + /** + * producer_config: the configuration of Kafka Producer. The format is: + * plugin.kafka.producer_config[key] = value + */ public static Map PRODUCER_CONFIG = new HashMap<>(); + /** + * Configure Kafka Producer configuration in JSON format. Notice it will be overridden by + * `plugin.kafka.producer_config[key]`, if the key duplication. For example: + * plugin.kafka.producer_config_json = {"batch.size":32768} + */ + public static String PRODUCER_CONFIG_JSON = ""; + /** * Timeout period of reading topics from the Kafka server, the unit is second. */ diff --git a/docs/en/setup/service-agent/java-agent/How-to-enable-kafka-reporter.md b/docs/en/setup/service-agent/java-agent/How-to-enable-kafka-reporter.md index baf9c6985f..2dd5403a6d 100644 --- a/docs/en/setup/service-agent/java-agent/How-to-enable-kafka-reporter.md +++ b/docs/en/setup/service-agent/java-agent/How-to-enable-kafka-reporter.md @@ -12,10 +12,21 @@ collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800} # Kafka producer configuration plugin.kafka.bootstrap_servers=${SW_KAFKA_BOOTSTRAP_SERVERS:localhost:9092} -plugin.kafka.producer_config[delivery.timeout.ms]=12000 plugin.kafka.get_topic_timeout=${SW_GET_TOPIC_TIMEOUT:10} ``` -Kafka reporter plugin support to customize all configurations of listed in [here](http://kafka.apache.org/24/documentation.html#producerconfigs). - Before you activated the Kafka reporter, you have to make sure that [Kafka fetcher](../../backend/backend-fetcher.md#kafka-fetcher) has been opened in service. + +### Advanced Kafka Producer Configurations + +Kafka reporter plugin support to customize all configurations of listed in [here](http://kafka.apache.org/24/documentation.html#producerconfigs). For example: +```properties +plugin.kafka.producer_config[delivery.timeout.ms]=12000 +``` + +Since SkyWalking 8.8.0, support to configure advanced Producer configurations in JSON format, like this: +```properties +plugin.kafka.producer_config_json={"delivery.timeout.ms": 12000, "compression.type": "snappy"} +``` + +Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys. diff --git a/docs/en/setup/service-agent/java-agent/README.md b/docs/en/setup/service-agent/java-agent/README.md index 0c63693a80..06e5bcd612 100755 --- a/docs/en/setup/service-agent/java-agent/README.md +++ b/docs/en/setup/service-agent/java-agent/README.md @@ -153,8 +153,8 @@ property key | Description | Default | `plugin.dubbo.provider_arguments_length_threshold`| When `plugin.dubbo.collect_provider_arguments` is `true`, Arguments of length from the front will to the OAP backend |`256`| `plugin.kafka.bootstrap_servers`| A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. | `localhost:9092`| `plugin.kafka.get_topic_timeout`| Timeout period of reading topics from the Kafka server, the unit is second. |`10`| -`plugin.kafka.consumer_config`| Kafka producer configuration. || `plugin.kafka.producer_config`| Kafka producer configuration. Read [producer configure](http://kafka.apache.org/24/documentation.html#producerconfigs) to get more details. Check [Kafka report doc](How-to-enable-kafka-reporter.md) for more details and examples. | | +`plugin.kafka.producer_config_json` | Configure Kafka Producer configuration in JSON format. Notice it will be overridden by `plugin.kafka.producer_config[key]`, if the key duplication. | | `plugin.kafka.topic_meter` | Specify which Kafka topic name for Meter System data to report to. | `skywalking-meters` | `plugin.kafka.topic_metrics` | Specify which Kafka topic name for JVM metrics data to report to. | `skywalking-metrics` | `plugin.kafka.topic_segment` | Specify which Kafka topic name for traces data to report to. | `skywalking-segments` | -- GitLab