From 49e7fea6d375eb187aaa301b8ce28aadfffba6b1 Mon Sep 17 00:00:00 2001 From: EricZeng Date: Mon, 3 Jul 2023 15:33:15 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]Topic-Messages=E9=A1=B5=E9=9D=A2?= =?UTF-8?q?=E5=90=8E=E7=AB=AF=E5=A2=9E=E5=8A=A0=E6=8C=89=E7=85=A7Partition?= =?UTF-8?q?=E5=92=8COffset=E7=BA=AC=E5=BA=A6=E7=9A=84=E6=8E=92=E5=BA=8F=20?= =?UTF-8?q?(#1075)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../biz/topic/impl/TopicStateManagerImpl.java | 203 ++++++++++-------- .../common/constant/PaginationConstant.java | 5 +- 2 files changed, 113 insertions(+), 95 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java index 3b4b5b5f..c2a4f244 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/TopicStateManagerImpl.java @@ -28,6 +28,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.topic.partition.TopicPart import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.constant.PaginationConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; @@ -46,8 +47,6 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems; -import org.apache.commons.lang3.ObjectUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.TopicConfig; @@ -105,7 +104,7 @@ public class TopicStateManagerImpl implements TopicStateManager { TopicBrokerAllVO allVO = new TopicBrokerAllVO(); allVO.setTotal(topic.getBrokerIdSet().size()); - allVO.setLive((int)brokerMap.values().stream().filter(elem -> elem.alive()).count()); + allVO.setLive((int)brokerMap.values().stream().filter(Broker::alive).count()); allVO.setDead(allVO.getTotal() - allVO.getLive()); allVO.setPartitionCount(topic.getPartitionNum()); @@ -157,95 +156,28 @@ public class TopicStateManagerImpl implements TopicStateManager { return Result.buildFromIgnoreData(endOffsetsMapResult); } - List voList = new ArrayList<>(); - - KafkaConsumer kafkaConsumer = null; - try { - // 创建kafka-consumer - kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords())); - - List partitionList = new ArrayList<>(); - long maxMessage = 0; - for (Map.Entry entry : endOffsetsMapResult.getData().entrySet()) { - long begin = beginOffsetsMapResult.getData().get(entry.getKey()); - long end = entry.getValue(); - if (begin == end){ - continue; - } - maxMessage += end - begin; - partitionList.add(entry.getKey()); - } - maxMessage = Math.min(maxMessage, dto.getMaxRecords()); - kafkaConsumer.assign(partitionList); - - Map partitionOffsetAndTimestampMap = new HashMap<>(); - // 获取指定时间每个分区的offset(按指定开始时间查询消息时) - if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { - Map timestampsToSearch = new HashMap<>(); - partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs())); - partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); - } - - for (TopicPartition partition : partitionList) { - if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { - // 重置到最旧 - kafkaConsumer.seek(partition, beginOffsetsMapResult.getData().get(partition)); - } else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { - // 重置到指定时间 - kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); - } else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { - // 重置到指定位置 - - } else { - // 默认,重置到最新 - kafkaConsumer.seek(partition, Math.max(beginOffsetsMapResult.getData().get(partition), endOffsetsMapResult.getData().get(partition) - dto.getMaxRecords())); - } - } - - // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 - while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) { - ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); - for (ConsumerRecord consumerRecord : consumerRecords) { - if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) { - continue; - } - - voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord)); - if (voList.size() >= dto.getMaxRecords()) { - break; - } - } - - // 超时则返回 - if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs() - || voList.size() > dto.getMaxRecords()) { - break; - } - } - - // 排序 - if (ObjectUtils.isNotEmpty(voList)) { - // 默认按时间倒序排序 - if (StringUtils.isBlank(dto.getSortType())) { - dto.setSortType(SortTypeEnum.DESC.getSortType()); - } - PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType()); - } + // 数据采集 + List voList = this.getTopicMessages(clusterPhy, topicName, beginOffsetsMapResult.getData(), endOffsetsMapResult.getData(), startTime, dto); - return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); - } catch (Exception e) { - log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhyId, topicName, dto, e); + // 排序 + if (ValidateUtils.isBlank(dto.getSortType())) { + // 默认按时间倒序排序 + dto.setSortType(SortTypeEnum.DESC.getSortType()); + } + if (ValidateUtils.isBlank(dto.getSortField())) { + // 默认按照timestampUnitMs字段排序 + dto.setSortField(PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD); + } - throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); - } finally { - if (kafkaConsumer != null) { - try { - kafkaConsumer.close(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); - } catch (Exception e) { - // ignore - } - } + if (PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD.equals(dto.getSortField())) { + // 如果是时间类型,则第二排序规则是offset + PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_OFFSET_SORTED_FIELD, dto.getSortType()); + } else { + // 如果是非时间类型,则第二排序规则是时间 + PaginationUtil.pageBySort(voList, dto.getSortField(), dto.getSortType(), PaginationConstant.TOPIC_RECORDS_TIME_SORTED_FIELD, dto.getSortType()); } + + return Result.buildSuc(voList.subList(0, Math.min(dto.getMaxRecords(), voList.size()))); } @Override @@ -338,7 +270,7 @@ public class TopicStateManagerImpl implements TopicStateManager { // Broker统计信息 vo.setBrokerCount(brokerMap.size()); - vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(elem -> elem.alive()).count()); + vo.setLiveBrokerCount((int)brokerMap.values().stream().filter(Broker::alive).count()); vo.setDeadBrokerCount(vo.getBrokerCount() - vo.getLiveBrokerCount()); // Partition统计信息 @@ -394,11 +326,8 @@ public class TopicStateManagerImpl implements TopicStateManager { // ignore return true; } - if (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue)) { - return true; - } - return false; + return (filterValue != null && consumerRecord.value() != null && !consumerRecord.value().contains(filterValue)); } private TopicBrokerSingleVO getTopicBrokerSingle(Long clusterPhyId, @@ -458,4 +387,90 @@ public class TopicStateManagerImpl implements TopicStateManager { props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Math.max(2, Math.min(5, maxPollRecords))); return props; } + + private List getTopicMessages(ClusterPhy clusterPhy, + String topicName, + Map beginOffsetsMap, + Map endOffsetsMap, + long startTime, + TopicRecordDTO dto) throws AdminOperateException { + List voList = new ArrayList<>(); + + try (KafkaConsumer kafkaConsumer = new KafkaConsumer<>(this.generateClientProperties(clusterPhy, dto.getMaxRecords()))) { + // 移动到指定位置 + long maxMessage = this.assignAndSeekToSpecifiedOffset(kafkaConsumer, beginOffsetsMap, endOffsetsMap, dto); + + // 这里需要减去 KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS 是因为poll一次需要耗时,如果这里不减去,则可能会导致poll之后,超过要求的时间 + while (System.currentTimeMillis() - startTime <= dto.getPullTimeoutUnitMs() && voList.size() < maxMessage) { + ConsumerRecords consumerRecords = kafkaConsumer.poll(Duration.ofMillis(KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS)); + for (ConsumerRecord consumerRecord : consumerRecords) { + if (this.checkIfIgnore(consumerRecord, dto.getFilterKey(), dto.getFilterValue())) { + continue; + } + + voList.add(TopicVOConverter.convert2TopicRecordVO(topicName, consumerRecord)); + if (voList.size() >= dto.getMaxRecords()) { + break; + } + } + + // 超时则返回 + if (System.currentTimeMillis() - startTime + KafkaConstant.POLL_ONCE_TIMEOUT_UNIT_MS > dto.getPullTimeoutUnitMs() + || voList.size() > dto.getMaxRecords()) { + break; + } + } + + return voList; + } catch (Exception e) { + log.error("method=getTopicMessages||clusterPhyId={}||topicName={}||param={}||errMsg=exception", clusterPhy.getId(), topicName, dto, e); + + throw new AdminOperateException(e.getMessage(), e, ResultStatus.KAFKA_OPERATE_FAILED); + } + } + + private long assignAndSeekToSpecifiedOffset(KafkaConsumer kafkaConsumer, + Map beginOffsetsMap, + Map endOffsetsMap, + TopicRecordDTO dto) { + List partitionList = new ArrayList<>(); + long maxMessage = 0; + for (Map.Entry entry : endOffsetsMap.entrySet()) { + long begin = beginOffsetsMap.get(entry.getKey()); + long end = entry.getValue(); + if (begin == end){ + continue; + } + maxMessage += end - begin; + partitionList.add(entry.getKey()); + } + maxMessage = Math.min(maxMessage, dto.getMaxRecords()); + kafkaConsumer.assign(partitionList); + + Map partitionOffsetAndTimestampMap = new HashMap<>(); + // 获取指定时间每个分区的offset(按指定开始时间查询消息时) + if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + Map timestampsToSearch = new HashMap<>(); + partitionList.forEach(topicPartition -> timestampsToSearch.put(topicPartition, dto.getStartTimestampUnitMs())); + partitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(timestampsToSearch); + } + + for (TopicPartition partition : partitionList) { + if (OffsetTypeEnum.EARLIEST.getResetType() == dto.getFilterOffsetReset()) { + // 重置到最旧 + kafkaConsumer.seek(partition, beginOffsetsMap.get(partition)); + } else if (OffsetTypeEnum.PRECISE_TIMESTAMP.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定时间 + kafkaConsumer.seek(partition, partitionOffsetAndTimestampMap.get(partition).offset()); + } else if (OffsetTypeEnum.PRECISE_OFFSET.getResetType() == dto.getFilterOffsetReset()) { + // 重置到指定位置 + + } else { + // 默认,重置到最新 + kafkaConsumer.seek(partition, Math.max(beginOffsetsMap.get(partition), endOffsetsMap.get(partition) - dto.getMaxRecords())); + } + } + + return maxMessage; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java index 9b8def80..9d3b8355 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/PaginationConstant.java @@ -27,5 +27,8 @@ public class PaginationConstant { /** * groupTopic列表的默认排序规则 */ - public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName"; + public static final String DEFAULT_GROUP_TOPIC_SORTED_FIELD = "topicName"; + + public static final String TOPIC_RECORDS_TIME_SORTED_FIELD = "timestampUnitMs"; + public static final String TOPIC_RECORDS_OFFSET_SORTED_FIELD = "offset"; } -- GitLab