diff --git a/CHANGES.md b/CHANGES.md index 745f3d01d41a8e32f3f42c59d85f9328e9ace8a6..a0933d8386b54c6a5252782ab4d7c91f7b7f056f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -12,6 +12,7 @@ Release Notes. #### OAP-Backend * Make meter receiver support MAL. +* Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters. #### UI * Fix un-removed tags in trace query. diff --git a/docs/en/setup/backend/backend-fetcher.md b/docs/en/setup/backend/backend-fetcher.md index 95a58188fac2a629535433085f5c4c81e79e2ad0..764e9b26a330fcf28dc96ba45f537dff18b411f5 100644 --- a/docs/en/setup/backend/backend-fetcher.md +++ b/docs/en/setup/backend/backend-fetcher.md @@ -122,3 +122,21 @@ kafka-fetcher: enable.auto.commit: true ... ``` + +When use Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters, you can set the source Kafka Cluster alias(mm2SourceAlias) and separator(mm2SourceSeparator) according to your Kafka MirrorMaker [config](https://github.com/apache/kafka/tree/trunk/connect/mirror#remote-topics). +```yaml +kafka-fetcher: + selector: ${SW_KAFKA_FETCHER:default} + default: + bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092} + partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3} + replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2} + enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false} + isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:true} + consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:1,3,5} + mm2SourceAlias: ${SW_KAFKA_MM2_SOURCE_ALIAS:""} + mm2SourceSeparator: ${SW_KAFKA_MM2_SOURCE_SEPARATOR:""} + kafkaConsumerConfig: + enable.auto.commit: true + ... +``` diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java index 27e6a257120553380981e146ad3001ff351c2174..378efbbb7adfed284ad664da2cee4e15dc8c35e4 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/module/KafkaFetcherConfig.java @@ -84,5 +84,9 @@ public class KafkaFetcherConfig extends ModuleConfig { private int kafkaHandlerThreadPoolSize; private int kafkaHandlerThreadPoolQueueSize; + + private String mm2SourceAlias = ""; + private String mm2SourceSeparator = ""; + } diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java index 3fe3ce802cebd6aeac16a23cf188be2845045533..71297b5d5c3d7a9401e6af962a1b790f7b63c717 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/JVMMetricsHandler.java @@ -73,7 +73,7 @@ public class JVMMetricsHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfMetrics(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMetrics(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java index 6a985ecdbbb16e17b437ef93caa24cac3765e8e0..c20a169d870314ef76e3b88979bac5b8db802509 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/MeterServiceHandler.java @@ -57,7 +57,7 @@ public class MeterServiceHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfMeters(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMeters(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java index 4d37b3dadcdd7079ee3de3e7fbfa8147860d919b..7cadd133a28091c8980a5fe244acbd20fc104f23 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ProfileTaskHandler.java @@ -68,7 +68,7 @@ public class ProfileTaskHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfProfiling(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfProfiling(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java index 8f2f5d3bf6f18f31de97fa63fcd4f8cb96ca6350..29f4b71ae0d3d1ccd51204e0f09007b31ab51939 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/ServiceManagementHandler.java @@ -124,7 +124,7 @@ public class ServiceManagementHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfManagements(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfManagements(); } @Override diff --git a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java index 7a41f5ddfa89aac8548f9c4e5f570d999a8c15b0..16c2f8c7a3776aea0794e38f8e87205deb4fa33e 100644 --- a/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java +++ b/oap-server/server-fetcher-plugin/kafka-fetcher-plugin/src/main/java/org/apache/skywalking/oap/server/analyzer/agent/kafka/provider/handler/TraceSegmentHandler.java @@ -96,7 +96,7 @@ public class TraceSegmentHandler implements KafkaHandler { @Override public String getTopic() { - return config.getTopicNameOfTracingSegments(); + return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfTracingSegments(); } @Override