未验证 提交 76541617 编写于 作者: lengyueqiufeng's avatar lengyueqiufeng 提交者: GitHub

Fix the problem of Kafka transport topics are created duplicated with and...

 Fix the problem of Kafka transport topics are created duplicated with and without namespace (#7326) (#7329)
上级 3a4ee08e
......@@ -108,9 +108,10 @@ Release Notes.
* The `core/syncThreads` setting(added in 8.5.0) is removed due to metrics persistence is fully asynchronous.
* Optimization: Concurrency mode of execution stage for metrics is removed(added in 8.5.0). Only concurrency of prepare
stage is meaningful and kept.
* Fix -meters metrics topic isn't created with namespace issue
* Fix `-meters` metrics topic isn't created with namespace issue
* Enhance persistent session timeout mechanism. Because the enhanced session could cache the metadata metrics forever,
new timeout mechanism is designed for avoiding this specific case.
* Fix Kafka transport topics are created duplicated with and without namespace issue
#### UI
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.analyzer.agent.kafka;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
......@@ -60,60 +61,21 @@ public class KafkaFetcherHandlerRegister implements Runnable {
private KafkaConsumer<String, Bytes> consumer = null;
private final KafkaFetcherConfig config;
private final boolean isSharding;
private final Properties properties;
private int threadPoolSize = Runtime.getRuntime().availableProcessors() * 2;
private int threadPoolQueueSize = 10000;
private final ThreadPoolExecutor executor;
private final boolean enableKafkaMessageAutoCommit;
public KafkaFetcherHandlerRegister(KafkaFetcherConfig config) throws ModuleStartException {
public KafkaFetcherHandlerRegister(KafkaFetcherConfig config) {
this.config = config;
Properties properties = new Properties();
properties = new Properties();
properties.putAll(config.getKafkaConsumerConfig());
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, config.getGroupId());
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getBootstrapServers());
AdminClient adminClient = AdminClient.create(properties);
Set<String> missedTopics = adminClient.describeTopics(Lists.newArrayList(
config.getTopicNameOfManagements(),
config.getTopicNameOfMetrics(),
config.getTopicNameOfProfiling(),
config.getTopicNameOfTracingSegments(),
config.getTopicNameOfMeters(),
config.getTopicNameOfLogs(),
config.getTopicNameOfJsonLogs()
))
.values()
.entrySet()
.stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch (InterruptedException | ExecutionException ignore) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", e);
}
}
if (config.isSharding() && StringUtil.isNotEmpty(config.getConsumePartitions())) {
isSharding = true;
} else {
......@@ -142,8 +104,11 @@ public class KafkaFetcherHandlerRegister implements Runnable {
topicPartitions.addAll(handler.getTopicPartitions());
}
public void start() {
public void start() throws ModuleStartException {
handlerMap = builder.build();
createTopicIfNeeded(handlerMap.keySet(), properties);
if (isSharding) {
consumer.assign(topicPartitions);
} else {
......@@ -171,4 +136,38 @@ public class KafkaFetcherHandlerRegister implements Runnable {
}
}
}
private void createTopicIfNeeded(Collection<String> topics, Properties properties) throws ModuleStartException {
AdminClient adminClient = AdminClient.create(properties);
Set<String> missedTopics = adminClient.describeTopics(topics)
.values()
.entrySet()
.stream()
.map(entry -> {
try {
entry.getValue().get();
return null;
} catch (InterruptedException | ExecutionException ignore) {
}
return entry.getKey();
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
if (!missedTopics.isEmpty()) {
log.info("Topics" + missedTopics.toString() + " not exist.");
List<NewTopic> newTopicList = missedTopics.stream()
.map(topic -> new NewTopic(
topic,
config.getPartitions(),
(short) config.getReplicationFactor()
)).collect(Collectors.toList());
try {
adminClient.createTopics(newTopicList).all().get();
} catch (Exception e) {
throw new ModuleStartException("Failed to create Kafka Topics" + missedTopics + ".", e);
}
}
}
}
......@@ -69,7 +69,7 @@ public class KafkaFetcherProvider extends ModuleProvider {
}
@Override
public void start() throws ServiceNotProvidedException {
public void start() throws ServiceNotProvidedException, ModuleStartException {
handlerRegister.register(new JVMMetricsHandler(getManager(), config));
handlerRegister.register(new ServiceManagementHandler(getManager(), config));
handlerRegister.register(new TraceSegmentHandler(getManager(), config));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册