提交 af1bb2cc 编写于 作者: Z zengqiao 提交者: EricZeng

[Optimize] 删除Replica指标采集任务

1、当集群存在较多副本时,指标采集的性能会严重降低;
2、Replica的指标基本上都是在实时获取时才需要,因此当前先将Replica指标采集任务关闭,后续依据产品需要再看是否开启;
上级 714e9a56
......@@ -91,7 +91,7 @@ public class ReplicaMetricCollector extends AbstractMetricCollector<ReplicationM
continue;
}
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
Result<ReplicationMetrics> ret = replicaMetricService.collectReplicaMetricsFromKafka(
clusterPhyId,
metrics.getTopic(),
metrics.getBrokerId(),
......
......@@ -24,11 +24,6 @@ public class CollectedMetricsLocalCache {
.maximumSize(10000)
.build();
private static final Cache<String, Float> replicaMetricsValueCache = Caffeine.newBuilder()
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(20000)
.build();
public static Float getBrokerMetrics(String brokerMetricKey) {
return brokerMetricsCache.getIfPresent(brokerMetricKey);
}
......@@ -64,17 +59,6 @@ public class CollectedMetricsLocalCache {
partitionMetricsCache.put(partitionMetricsKey, metricsList);
}
public static Float getReplicaMetrics(String replicaMetricsKey) {
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
}
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
if (value == null) {
return;
}
replicaMetricsValueCache.put(replicaMetricsKey, value);
}
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + metricName;
}
......
......@@ -37,6 +37,8 @@ import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BrokerMetri
import com.xiaojukeji.know.streaming.km.core.service.version.metrics.ReplicaMetricVersionItems;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BrokerMetricESDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient;
import org.apache.kafka.clients.admin.LogDirDescription;
import org.apache.kafka.clients.admin.ReplicaInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
......@@ -49,6 +51,7 @@ import java.util.*;
import java.util.stream.Collectors;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
/**
* @author didi
......@@ -105,7 +108,11 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
registerVCHandler( BROKER_METHOD_GET_HEALTH_SCORE, this::getMetricHealthScore);
registerVCHandler( BROKER_METHOD_GET_PARTITIONS_SKEW, this::getPartitionsSkew);
registerVCHandler( BROKER_METHOD_GET_LEADERS_SKEW, this::getLeadersSkew);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
// registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, this::getLogSize);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_0_10_0_0, V_1_0_0, "getLogSizeFromJmx", this::getLogSizeFromJmx);
registerVCHandler( BROKER_METHOD_GET_LOG_SIZE, V_1_0_0, V_MAX, "getLogSizeFromClient", this::getLogSizeFromClient);
registerVCHandler( BROKER_METHOD_IS_BROKER_ALIVE, this::isBrokerAlive);
}
......@@ -351,7 +358,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
);
}
private Result<BrokerMetrics> getLogSize(VersionItemParam metricParam) {
private Result<BrokerMetrics> getLogSizeFromJmx(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;
String metric = param.getMetric();
......@@ -360,19 +367,17 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
List<Partition> partitions = partitionService.listPartitionByBroker(clusterId, brokerId);
JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, brokerId);
if (ValidateUtils.isNull(jmxConnectorWrap)){return Result.buildFailure(VC_JMX_INIT_ERROR);}
Float logSizeSum = 0f;
for(Partition p : partitions) {
try {
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafkaWithCache(
Result<ReplicationMetrics> metricsResult = replicaMetricService.collectReplicaMetricsFromKafka(
clusterId,
p.getTopicName(),
brokerId,
p.getPartitionId(),
ReplicaMetricVersionItems.REPLICATION_METRIC_LOG_SIZE
);
if(null == metricsResult || metricsResult.failed() || null == metricsResult.getData()) {
continue;
}
......@@ -391,6 +396,28 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
}
private Result<BrokerMetrics> getLogSizeFromClient(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;
String metric = param.getMetric();
Long clusterId = param.getClusterId();
Integer brokerId = param.getBrokerId();
Result<Map<String, LogDirDescription>> descriptionMapResult = brokerService.getBrokerLogDirDescFromKafka(clusterId, brokerId);
if(null == descriptionMapResult || descriptionMapResult.failed() || null == descriptionMapResult.getData()) {
return Result.buildFromIgnoreData(descriptionMapResult);
}
Float logSizeSum = 0f;
for (LogDirDescription logDirDescription: descriptionMapResult.getData().values()) {
for (ReplicaInfo replicaInfo: logDirDescription.replicaInfos().values()) {
logSizeSum += replicaInfo.size();
}
}
return Result.buildSuc(BrokerMetrics.initWithMetric(clusterId, brokerId, metric, logSizeSum));
}
private Result<BrokerMetrics> getLeadersSkew(VersionItemParam metricParam) {
BrokerMetricParam param = (BrokerMetricParam)metricParam;
......
......@@ -13,12 +13,14 @@ public interface ReplicaMetricService {
* 从kafka中采集指标
*/
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topic, Integer partitionId, Integer brokerId, String metric);
Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic, Integer brokerId, Integer partitionId, String metric);
Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList);
/**
* 从ES中获取指标
*/
@Deprecated
Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, MetricDTO dto);
@Deprecated
Result<ReplicationMetrics> getLatestMetricsFromES(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, List<String> metricNames);
}
......@@ -17,7 +17,6 @@ import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap;
import com.xiaojukeji.know.streaming.km.common.utils.BeanUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.cache.CollectedMetricsLocalCache;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.replica.ReplicaMetricService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseMetricService;
......@@ -77,32 +76,36 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
}
@Override
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
String topic,
Integer brokerId,
Integer partitionId,
String metric) {
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
if(null != keyValue){
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
replicationMetrics.putMetric(metric, keyValue);
return Result.buildSuc(replicationMetrics);
public Result<ReplicationMetrics> collectReplicaMetricsFromKafka(Long clusterId, String topicName, Integer partitionId, Integer brokerId, List<String> metricNameList) {
ReplicationMetrics metrics = new ReplicationMetrics(clusterId, topicName, brokerId, partitionId);
for (String metricName: metricNameList) {
try {
if (metrics.getMetrics().containsKey(metricName)) {
continue;
}
Result<ReplicationMetrics> ret = this.collectReplicaMetricsFromKafka(
clusterId,
metrics.getTopic(),
metrics.getBrokerId(),
metrics.getPartitionId(),
metricName
);
if (null == ret || ret.failed() || null == ret.getData()) {
continue;
}
metrics.putMetric(ret.getData().getMetrics());
} catch (Exception e) {
LOGGER.error(
"method=collectReplicaMetricsFromKafka||clusterPhyId={}||topicName={}||partition={}||brokerId={}||metricName={}||errMsg=exception!",
clusterId, topicName, partitionId, brokerId, e
);
}
}
Result<ReplicationMetrics> ret = collectReplicaMetricsFromKafka(clusterPhyId, topic, partitionId, brokerId, metric);
if(null == ret || ret.failed() || null == ret.getData()){return ret;}
// 更新cache
ret.getData().getMetrics().entrySet().stream().forEach(
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
replicaMetricsKey,
metricNameAndValueEntry.getValue()
)
);
return ret;
return Result.buildSuc(metrics);
}
@Override
......@@ -167,8 +170,8 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
Integer brokerId = metricParam.getBrokerId();
Integer partitionId = metricParam.getPartitionId();
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafkaWithCache(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
Result<ReplicationMetrics> endRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_END_OFFSET);
Result<ReplicationMetrics> startRet = this.collectReplicaMetricsFromKafka(clusterId, topic, brokerId, partitionId, REPLICATION_METRIC_LOG_START_OFFSET);
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterId, topic, brokerId, partitionId);
if(null != endRet && endRet.successful() && null != startRet && startRet.successful()){
......
......@@ -26,6 +26,7 @@ public class ReplicaMetricsController {
@Autowired
private ReplicaMetricService replicationMetricService;
@Deprecated
@ApiOperation(value = "Replica指标-单个Replica")
@PostMapping(value = "clusters/{clusterPhyId}/brokers/{brokerId}/topics/{topicName}/partitions/{partitionId}/metric-points")
@ResponseBody
......@@ -45,7 +46,7 @@ public class ReplicaMetricsController {
@PathVariable String topicName,
@PathVariable Integer partitionId,
@RequestBody List<String> metricsNames) {
Result<ReplicationMetrics> metricsResult = replicationMetricService.getLatestMetricsFromES(clusterPhyId, brokerId, topicName, partitionId, metricsNames);
Result<ReplicationMetrics> metricsResult = replicationMetricService.collectReplicaMetricsFromKafka(clusterPhyId, topicName, partitionId, brokerId, metricsNames);
if (metricsResult.failed()) {
return Result.buildFromIgnoreData(metricsResult);
}
......
package com.xiaojukeji.know.streaming.km.task.metrics;
import com.didiglobal.logi.job.annotation.Task;
import com.didiglobal.logi.job.common.TaskResult;
import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
/**
* @author didi
*/
@Slf4j
@Task(name = "ReplicaMetricCollectorTask",
description = "Replica指标采集任务",
cron = "0 0/1 * * * ? *",
autoRegister = true,
consensual = ConsensualEnum.BROADCAST,
timeout = 2 * 60)
public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
@Autowired
private ReplicaMetricCollector replicaMetricCollector;
@Override
public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
replicaMetricCollector.collectMetrics(clusterPhy);
return TaskResult.SUCCESS;
}
}
//package com.xiaojukeji.know.streaming.km.task.metrics;
//
//import com.didiglobal.logi.job.annotation.Task;
//import com.didiglobal.logi.job.common.TaskResult;
//import com.didiglobal.logi.job.core.consensual.ConsensualEnum;
//import com.xiaojukeji.know.streaming.km.collector.metric.ReplicaMetricCollector;
//import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//
///**
// * @author didi
// */
//@Slf4j
//@Task(name = "ReplicaMetricCollectorTask",
// description = "Replica指标采集任务",
// cron = "0 0/1 * * * ? *",
// autoRegister = true,
// consensual = ConsensualEnum.BROADCAST,
// timeout = 2 * 60)
//public class ReplicaMetricCollectorTask extends AbstractAsyncMetricsDispatchTask {
//
// @Autowired
// private ReplicaMetricCollector replicaMetricCollector;
//
// @Override
// public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) throws Exception {
// replicaMetricCollector.collectMetrics(clusterPhy);
//
// return TaskResult.SUCCESS;
// }
//}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册