package com.xiaojukeji.know.streaming.km.biz.cluster.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.cluster.ClusterBrokersManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterBrokersOverviewDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BrokerMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersOverviewVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterBrokersStateVO; import com.xiaojukeji.know.streaming.km.common.bean.vo.kafkacontroller.KafkaControllerVO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerConfigService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerMetricService; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @Service public class ClusterBrokersManagerImpl implements ClusterBrokersManager { private static final ILog log = LogFactory.getLog(ClusterBrokersManagerImpl.class); @Autowired private TopicService topicService; @Autowired private BrokerService brokerService; @Autowired private BrokerConfigService brokerConfigService; @Autowired private BrokerMetricService brokerMetricService; @Autowired private KafkaControllerService kafkaControllerService; @Override public PaginationResult getClusterPhyBrokersOverview(Long clusterPhyId, ClusterBrokersOverviewDTO dto) { // 获取集群Broker列表 List brokerList = brokerService.listAllBrokersFromDB(clusterPhyId); // 搜索 brokerList = PaginationUtil.pageByFuzzyFilter(brokerList, dto.getSearchKeywords(), Arrays.asList("host")); // 获取指标 Result> metricsResult = brokerMetricService.getLatestMetricsFromES( clusterPhyId, brokerList.stream().filter(elem1 -> elem1.alive()).map(elem2 -> elem2.getBrokerId()).collect(Collectors.toList()) ); // 分页 + 搜索 PaginationResult paginationResult = this.pagingBrokers(brokerList, metricsResult.hasData()? metricsResult.getData(): new ArrayList<>(), dto); // 获取__consumer_offsetsTopic的分布 Topic groupTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME); Topic transactionTopic = topicService.getTopic(clusterPhyId, org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME); //获取controller信息 KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); // 格式转换 return PaginationResult.buildSuc( this.convert2ClusterBrokersOverviewVOList( paginationResult.getData().getBizData(), brokerList, metricsResult.getData(), groupTopic, transactionTopic, kafkaController ), paginationResult ); } @Override public ClusterBrokersStateVO getClusterPhyBrokersState(Long clusterPhyId) { ClusterBrokersStateVO clusterBrokersStateVO = new ClusterBrokersStateVO(); // 获取集群Broker列表 List allBrokerList = brokerService.listAllBrokersFromDB(clusterPhyId); if (allBrokerList == null) { allBrokerList = new ArrayList<>(); } // 设置broker数 clusterBrokersStateVO.setBrokerCount(allBrokerList.size()); // 设置版本信息 clusterBrokersStateVO.setBrokerVersionList( this.getBrokerVersionList(clusterPhyId, allBrokerList.stream().filter(elem -> elem.alive()).collect(Collectors.toList())) ); // 获取controller信息 KafkaController kafkaController = kafkaControllerService.getKafkaControllerFromDB(clusterPhyId); // 设置kafka-controller信息 clusterBrokersStateVO.setKafkaControllerAlive(false); if(null != kafkaController) { clusterBrokersStateVO.setKafkaController( this.convert2KafkaControllerVO( kafkaController, brokerService.getBroker(clusterPhyId, kafkaController.getBrokerId()) ) ); clusterBrokersStateVO.setKafkaControllerAlive(true); } clusterBrokersStateVO.setConfigSimilar(brokerConfigService.countBrokerConfigDiffsFromDB(clusterPhyId, Arrays.asList("broker.id", "listeners", "name", "value")) <= 0); return clusterBrokersStateVO; } /**************************************************** private method ****************************************************/ private PaginationResult pagingBrokers(List brokerList, List metricsList, PaginationSortDTO dto) { if (ValidateUtils.isBlank(dto.getSortField())) { // 默认排序 return PaginationUtil.pageBySubData( PaginationUtil.pageBySort(brokerList, "brokerId", SortTypeEnum.ASC.getSortType()).stream().map(elem -> elem.getBrokerId()).collect(Collectors.toList()), dto ); } if (!brokerMetricService.isMetricName(dto.getSortField())) { // 非指标字段进行排序,分页 return PaginationUtil.pageBySubData( PaginationUtil.pageBySort(brokerList, dto.getSortField(), dto.getSortType()).stream().map(elem -> elem.getBrokerId()).collect(Collectors.toList()), dto ); } // 指标字段进行排序及分页 Map metricsMap = metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity())); brokerList.stream().forEach(elem -> { metricsMap.putIfAbsent(elem.getBrokerId(), new BrokerMetrics(elem.getClusterPhyId(), elem.getBrokerId())); }); // 排序 metricsList = (List) PaginationMetricsUtil.sortMetrics(new ArrayList<>(metricsMap.values()), dto.getSortField(), "brokerId", dto.getSortType()); return PaginationUtil.pageBySubData( metricsList.stream().map(elem -> elem.getBrokerId()).collect(Collectors.toList()), dto ); } private List convert2ClusterBrokersOverviewVOList(List pagedBrokerIdList, List brokerList, List metricsList, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) { Map metricsMap = metricsList == null? new HashMap<>(): metricsList.stream().collect(Collectors.toMap(BrokerMetrics::getBrokerId, Function.identity())); Map brokerMap = brokerList == null? new HashMap<>(): brokerList.stream().collect(Collectors.toMap(Broker::getBrokerId, Function.identity())); List voList = new ArrayList<>(pagedBrokerIdList.size()); for (Integer brokerId : pagedBrokerIdList) { Broker broker = brokerMap.get(brokerId); BrokerMetrics brokerMetrics = metricsMap.get(brokerId); voList.add(this.convert2ClusterBrokersOverviewVO(brokerId, broker, brokerMetrics, groupTopic, transactionTopic, kafkaController)); } return voList; } private ClusterBrokersOverviewVO convert2ClusterBrokersOverviewVO(Integer brokerId, Broker broker, BrokerMetrics brokerMetrics, Topic groupTopic, Topic transactionTopic, KafkaController kafkaController) { ClusterBrokersOverviewVO clusterBrokersOverviewVO = new ClusterBrokersOverviewVO(); clusterBrokersOverviewVO.setBrokerId(brokerId); if (broker != null) { clusterBrokersOverviewVO.setHost(broker.getHost()); clusterBrokersOverviewVO.setRack(broker.getRack()); clusterBrokersOverviewVO.setJmxPort(broker.getJmxPort()); clusterBrokersOverviewVO.setAlive(broker.alive()); clusterBrokersOverviewVO.setStartTimeUnitMs(broker.getStartTimestamp()); } clusterBrokersOverviewVO.setKafkaRoleList(new ArrayList<>()); if (groupTopic != null && groupTopic.getBrokerIdSet().contains(brokerId)) { clusterBrokersOverviewVO.getKafkaRoleList().add(groupTopic.getTopicName()); } if (transactionTopic != null && transactionTopic.getBrokerIdSet().contains(brokerId)) { clusterBrokersOverviewVO.getKafkaRoleList().add(transactionTopic.getTopicName()); } if (kafkaController != null && kafkaController.getBrokerId().equals(brokerId)) { clusterBrokersOverviewVO.getKafkaRoleList().add(KafkaConstant.CONTROLLER_ROLE); } clusterBrokersOverviewVO.setLatestMetrics(brokerMetrics); return clusterBrokersOverviewVO; } private KafkaControllerVO convert2KafkaControllerVO(KafkaController kafkaController, Broker kafkaControllerBroker) { if(null != kafkaController && null != kafkaControllerBroker) { KafkaControllerVO kafkaControllerVO = new KafkaControllerVO(); kafkaControllerVO.setBrokerId(kafkaController.getBrokerId()); kafkaControllerVO.setBrokerHost(kafkaControllerBroker.getHost()); return kafkaControllerVO; } return null; } private List getBrokerVersionList(Long clusterPhyId, List brokerList) { Set brokerVersionList = new HashSet<>(); for (Broker broker : brokerList) { brokerVersionList.add(brokerService.getBrokerVersionFromKafkaWithCacheFirst(broker.getClusterPhyId(),broker.getBrokerId(),broker.getStartTimestamp())); } brokerVersionList.remove(""); return new ArrayList<>(brokerVersionList); } }