From 6eb5648ba8928a73d5ca22ef7d36b40d08d86c8e Mon Sep 17 00:00:00 2001 From: osiriswd Date: Sun, 6 Dec 2020 17:10:30 +0800 Subject: [PATCH] Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters. (#5949) --- CHANGES.md | 1 + docs/en/setup/backend/backend-fetcher.md | 18 ++++++++++++++++++ .../agent/kafka/module/KafkaFetcherConfig.java | 4 ++++ .../provider/handler/JVMMetricsHandler.java | 2 +- .../provider/handler/MeterServiceHandler.java | 2 +- .../provider/handler/ProfileTaskHandler.java | 2 +- .../handler/ServiceManagementHandler.java | 2 +- .../provider/handler/TraceSegmentHandler.java | 2 +- 8 files changed, 28 insertions(+), 5 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 745f3d01d4..a0933d8386 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ Release Notes. #### OAP-Backend * Make meter receiver support MAL. +* Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters. #### UI * Fix un-removed tags in trace query. diff --git a/docs/en/setup/backend/backend-fetcher.md b/docs/en/setup/backend/backend-fetcher.md index 95a58188fa..764e9b26a3 100644 --- a/docs/en/setup/backend/backend-fetcher.md +++ b/docs/en/setup/backend/backend-fetcher.md @@ -122,3 +122,21 @@ kafka-fetcher: enable.auto.commit: true ... ``` + +When use Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters, you can set the source Kafka Cluster alias(mm2SourceAlias) and separator(mm2SourceSeparator) according to your Kafka MirrorMaker [config](https://github.com/apache/kafka/tree/trunk/connect/mirror#remote-topics). +```yaml +kafka-fetcher: + selector: ${SW_KAFKA_FETCHER:default} + default: + bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092} + partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3} + replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2} + enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false} + isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:true} + consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:1,3,5} + mm2SourceAlias: ${SW_KAFKA_MM2_SOURCE_ALIAS:""} + mm2SourceSeparator: ${SW_KAFKA_MM2_SOURCE_SEPARATOR:""} + kafkaConsumerConfig: + enable.auto.commit: true + ... +``` diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java index 27e6a25712..378efbbb7a 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java @@ -84,5 +84,9 @@ public class KafkaFetcherConfig extends ModuleConfig { private int kafkaHandlerThreadPoolSize; private int kafkaHandlerThreadPoolQueueSize; + + private String mm2SourceAlias = ""; + private String mm2SourceSeparator = ""; + } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java index 3fe3ce802c..71297b5d5c 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java @@ -73,7 +73,7 @@ public class JVMMetricsHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfMetrics(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMetrics(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java index 6a985ecdbb..c20a169d87 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java @@ -57,7 +57,7 @@ public class MeterServiceHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfMeters(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMeters(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java index 4d37b3dadc..7cadd133a2 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java @@ -68,7 +68,7 @@ public class ProfileTaskHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfProfiling(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfProfiling(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java index 8f2f5d3bf6..29f4b71ae0 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java @@ -124,7 +124,7 @@ public class ServiceManagementHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfManagements(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfManagements(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java index 7a41f5ddfa..16c2f8c7a3 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java @@ -96,7 +96,7 @@ public class TraceSegmentHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfTracingSegments(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfTracingSegments(); } @Override -- GitLab