未验证 提交 6eb5648b 编写于 作者: O osiriswd 提交者: GitHub

Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters. (#5949)

上级 7c406c30
......@@ -12,6 +12,7 @@ Release Notes.
#### OAP-Backend
* Make meter receiver support MAL.
* Support Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters.
#### UI
* Fix un-removed tags in trace query.
......
......@@ -122,3 +122,21 @@ kafka-fetcher:
enable.auto.commit: true
...
```
When use Kafka MirrorMaker 2.0 to replicate topics between Kafka clusters, you can set the source Kafka Cluster alias(mm2SourceAlias) and separator(mm2SourceSeparator) according to your Kafka MirrorMaker [config](https://github.com/apache/kafka/tree/trunk/connect/mirror#remote-topics).
```yaml
kafka-fetcher:
selector: ${SW_KAFKA_FETCHER:default}
default:
bootstrapServers: ${SW_KAFKA_FETCHER_SERVERS:localhost:9092}
partitions: ${SW_KAFKA_FETCHER_PARTITIONS:3}
replicationFactor: ${SW_KAFKA_FETCHER_PARTITIONS_FACTOR:2}
enableMeterSystem: ${SW_KAFKA_FETCHER_ENABLE_METER_SYSTEM:false}
isSharding: ${SW_KAFKA_FETCHER_IS_SHARDING:true}
consumePartitions: ${SW_KAFKA_FETCHER_CONSUME_PARTITIONS:1,3,5}
mm2SourceAlias: ${SW_KAFKA_MM2_SOURCE_ALIAS:""}
mm2SourceSeparator: ${SW_KAFKA_MM2_SOURCE_SEPARATOR:""}
kafkaConsumerConfig:
enable.auto.commit: true
...
```
......@@ -84,5 +84,9 @@ public class KafkaFetcherConfig extends ModuleConfig {
private int kafkaHandlerThreadPoolSize;
private int kafkaHandlerThreadPoolQueueSize;
private String mm2SourceAlias = "";
private String mm2SourceSeparator = "";
}
......@@ -73,7 +73,7 @@ public class JVMMetricsHandler implements KafkaHandler {
@Override
public String getTopic() {
return config.getTopicNameOfMetrics();
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMetrics();
}
@Override
......
......@@ -57,7 +57,7 @@ public class MeterServiceHandler implements KafkaHandler {
@Override
public String getTopic() {
return config.getTopicNameOfMeters();
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfMeters();
}
@Override
......
......@@ -68,7 +68,7 @@ public class ProfileTaskHandler implements KafkaHandler {
@Override
public String getTopic() {
return config.getTopicNameOfProfiling();
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfProfiling();
}
@Override
......
......@@ -124,7 +124,7 @@ public class ServiceManagementHandler implements KafkaHandler {
@Override
public String getTopic() {
return config.getTopicNameOfManagements();
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfManagements();
}
@Override
......
......@@ -96,7 +96,7 @@ public class TraceSegmentHandler implements KafkaHandler {
@Override
public String getTopic() {
return config.getTopicNameOfTracingSegments();
return config.getMm2SourceAlias() + config.getMm2SourceSeparator() + config.getTopicNameOfTracingSegments();
}
@Override
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册