From 0f746917a741c0e259e2446e02d9c3895c197c6f Mon Sep 17 00:00:00 2001 From: zengqiao Date: Fri, 2 Jul 2021 16:41:57 +0800 Subject: [PATCH] =?UTF-8?q?Topic=E5=9F=BA=E6=9C=AC=E4=BF=A1=E6=81=AF?= =?UTF-8?q?=E4=B8=AD=E5=A2=9E=E5=8A=A0retention.bytes=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constant/TopicCreationConstant.java | 2 + .../common/entity/ao/topic/TopicBasicDTO.java | 13 ++- .../entity/vo/normal/topic/TopicBasicVO.java | 92 +++++++++++-------- .../cache/PhysicalClusterMetadataManager.java | 43 +++++++-- .../service/impl/TopicServiceImpl.java | 1 + ...ionTime.java => FlushTopicProperties.java} | 19 ++-- .../normal/NormalTopicController.java | 5 +- .../web/converters/TopicModelConverter.java | 1 + 8 files changed, 111 insertions(+), 65 deletions(-) rename kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/{FlushTopicRetentionTime.java => FlushTopicProperties.java} (68%) diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java index 3a6dd478..4d569907 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/TopicCreationConstant.java @@ -25,6 +25,8 @@ public class TopicCreationConstant { public static final String TOPIC_RETENTION_TIME_KEY_NAME = "retention.ms"; + public static final String TOPIC_RETENTION_BYTES_KEY_NAME = "retention.bytes"; + public static final Long DEFAULT_QUOTA = 3 * 1024 * 1024L; public static Properties createNewProperties(Long retentionTime) { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java index e3ea08ed..9150569b 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicBasicDTO.java @@ -37,6 +37,8 @@ public class TopicBasicDTO { private Long retentionTime; + private Long retentionBytes; + public Long getClusterId() { return clusterId; } @@ -157,6 +159,14 @@ public class TopicBasicDTO { this.retentionTime = retentionTime; } + public Long getRetentionBytes() { + return retentionBytes; + } + + public void setRetentionBytes(Long retentionBytes) { + this.retentionBytes = retentionBytes; + } + @Override public String toString() { return "TopicBasicDTO{" + @@ -166,7 +176,7 @@ public class TopicBasicDTO { ", principals='" + principals + '\'' + ", topicName='" + topicName + '\'' + ", description='" + description + '\'' + - ", regionNameList='" + regionNameList + '\'' + + ", regionNameList=" + regionNameList + ", score=" + score + ", topicCodeC='" + topicCodeC + '\'' + ", partitionNum=" + partitionNum + @@ -175,6 +185,7 @@ public class TopicBasicDTO { ", modifyTime=" + modifyTime + ", createTime=" + createTime + ", retentionTime=" + retentionTime + + ", retentionBytes=" + retentionBytes + '}'; } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java index 946a9997..b200a150 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicBasicVO.java @@ -33,6 +33,9 @@ public class TopicBasicVO { @ApiModelProperty(value = "存储时间(ms)") private Long retentionTime; + @ApiModelProperty(value = "单分区数据保存大小(Byte)") + private Long retentionBytes; + @ApiModelProperty(value = "创建时间") private Long createTime; @@ -62,12 +65,20 @@ public class TopicBasicVO { this.clusterId = clusterId; } - public String getTopicCodeC() { - return topicCodeC; + public String getAppId() { + return appId; } - public void setTopicCodeC(String topicCodeC) { - this.topicCodeC = topicCodeC; + public void setAppId(String appId) { + this.appId = appId; + } + + public String getAppName() { + return appName; + } + + public void setAppName(String appName) { + this.appName = appName; } public Integer getPartitionNum() { @@ -86,76 +97,76 @@ public class TopicBasicVO { this.replicaNum = replicaNum; } - public Long getModifyTime() { - return modifyTime; + public String getPrincipals() { + return principals; } - public void setModifyTime(Long modifyTime) { - this.modifyTime = modifyTime; + public void setPrincipals(String principals) { + this.principals = principals; } - public Long getCreateTime() { - return createTime; + public Long getRetentionTime() { + return retentionTime; } - public void setCreateTime(Long createTime) { - this.createTime = createTime; + public void setRetentionTime(Long retentionTime) { + this.retentionTime = retentionTime; } - public String getPrincipals() { - return principals; + public Long getRetentionBytes() { + return retentionBytes; } - public void setPrincipals(String principals) { - this.principals = principals; + public void setRetentionBytes(Long retentionBytes) { + this.retentionBytes = retentionBytes; } - public String getDescription() { - return description; + public Long getCreateTime() { + return createTime; } - public void setDescription(String description) { - this.description = description; + public void setCreateTime(Long createTime) { + this.createTime = createTime; } - public void setAppId(String appId) { - this.appId = appId; + public Long getModifyTime() { + return modifyTime; } - public void setBootstrapServers(String bootstrapServers) { - this.bootstrapServers = bootstrapServers; + public void setModifyTime(Long modifyTime) { + this.modifyTime = modifyTime; } - public String getAppId() { - return appId; + public Integer getScore() { + return score; } - public String getBootstrapServers() { - return bootstrapServers; + public void setScore(Integer score) { + this.score = score; } - public Long getRetentionTime() { - return retentionTime; + public String getTopicCodeC() { + return topicCodeC; } - public void setRetentionTime(Long retentionTime) { - this.retentionTime = retentionTime; + public void setTopicCodeC(String topicCodeC) { + this.topicCodeC = topicCodeC; } - public String getAppName() { - return appName; + public String getDescription() { + return description; } - public void setAppName(String appName) { - this.appName = appName; + public void setDescription(String description) { + this.description = description; } - public Integer getScore() { - return score; + public String getBootstrapServers() { + return bootstrapServers; } - public void setScore(Integer score) { - this.score = score; + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; } public List getRegionNameList() { @@ -176,6 +187,7 @@ public class TopicBasicVO { ", replicaNum=" + replicaNum + ", principals='" + principals + '\'' + ", retentionTime=" + retentionTime + + ", retentionBytes=" + retentionBytes + ", createTime=" + createTime + ", modifyTime=" + modifyTime + ", score=" + score + diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index 631b254f..a7142fa9 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -3,10 +3,12 @@ package com.xiaojukeji.kafka.manager.service.cache; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; +import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.common.utils.NumberUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConfig; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; @@ -56,7 +58,7 @@ public class PhysicalClusterMetadataManager { private final static Map> TOPIC_METADATA_MAP = new ConcurrentHashMap<>(); - private final static Map> TOPIC_RETENTION_TIME_MAP = new ConcurrentHashMap<>(); + private final static Map> TOPIC_PROPERTIES_MAP = new ConcurrentHashMap<>(); private final static Map> BROKER_METADATA_MAP = new ConcurrentHashMap<>(); @@ -95,7 +97,7 @@ public class PhysicalClusterMetadataManager { // 初始化topic-map TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); - TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); + TOPIC_PROPERTIES_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); // 初始化cluster-map CLUSTER_MAP.put(clusterDO.getId(), clusterDO); @@ -158,7 +160,7 @@ public class PhysicalClusterMetadataManager { KAFKA_VERSION_MAP.remove(clusterId); TOPIC_METADATA_MAP.remove(clusterId); - TOPIC_RETENTION_TIME_MAP.remove(clusterId); + TOPIC_PROPERTIES_MAP.remove(clusterId); CLUSTER_MAP.remove(clusterId); } @@ -262,24 +264,45 @@ public class PhysicalClusterMetadataManager { //---------------------------配置相关元信息-------------- - public static void putTopicRetentionTime(Long clusterId, String topicName, Long retentionTime) { - Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); - if (timeMap == null) { + public static void putTopicProperties(Long clusterId, String topicName, Properties properties) { + if (ValidateUtils.isNull(clusterId) || ValidateUtils.isBlank(topicName) || ValidateUtils.isNull(properties)) { return; } - timeMap.put(topicName, retentionTime); + + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { + return; + } + propertiesMap.put(topicName, properties); } public static Long getTopicRetentionTime(Long clusterId, String topicName) { - Map timeMap = TOPIC_RETENTION_TIME_MAP.get(clusterId); - if (timeMap == null) { + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { + return null; + } + + Properties properties = propertiesMap.get(topicName); + if (ValidateUtils.isNull(properties)) { return null; } - return timeMap.get(topicName); + + return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_TIME_KEY_NAME)); } + public static Long getTopicRetentionBytes(Long clusterId, String topicName) { + Map propertiesMap = TOPIC_PROPERTIES_MAP.get(clusterId); + if (ValidateUtils.isNull(propertiesMap)) { + return null; + } + Properties properties = propertiesMap.get(topicName); + if (ValidateUtils.isNull(properties)) { + return null; + } + return NumberUtils.string2Long(properties.getProperty(TopicCreationConstant.TOPIC_RETENTION_BYTES_KEY_NAME)); + } //---------------------------Broker元信息相关-------------- diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java index 5c0176b1..154faf77 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicServiceImpl.java @@ -223,6 +223,7 @@ public class TopicServiceImpl implements TopicService { basicDTO.setCreateTime(topicMetadata.getCreateTime()); basicDTO.setModifyTime(topicMetadata.getModifyTime()); basicDTO.setRetentionTime(PhysicalClusterMetadataManager.getTopicRetentionTime(clusterId, topicName)); + basicDTO.setRetentionBytes(PhysicalClusterMetadataManager.getTopicRetentionBytes(clusterId, topicName)); TopicDO topicDO = topicManagerService.getByTopicName(clusterId, topicName); if (!ValidateUtils.isNull(topicDO)) { diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java similarity index 68% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java index 225f8393..41a8bde4 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicRetentionTime.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/schedule/metadata/FlushTopicProperties.java @@ -14,13 +14,14 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Properties; /** * @author zengqiao * @date 20/7/23 */ @Component -public class FlushTopicRetentionTime { +public class FlushTopicProperties { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); @Autowired @@ -33,7 +34,7 @@ public class FlushTopicRetentionTime { try { flush(clusterDO); } catch (Exception e) { - LOGGER.error("flush topic retention time failed, clusterId:{}.", clusterDO.getId(), e); + LOGGER.error("flush topic properties failed, clusterId:{}.", clusterDO.getId(), e); } } } @@ -41,22 +42,20 @@ public class FlushTopicRetentionTime { private void flush(ClusterDO clusterDO) { ZkConfigImpl zkConfig = PhysicalClusterMetadataManager.getZKConfig(clusterDO.getId()); if (ValidateUtils.isNull(zkConfig)) { - LOGGER.error("flush topic retention time, get zk config failed, clusterId:{}.", clusterDO.getId()); + LOGGER.error("flush topic properties, get zk config failed, clusterId:{}.", clusterDO.getId()); return; } for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) { try { - Long retentionTime = KafkaZookeeperUtils.getTopicRetentionTime(zkConfig, topicName); - if (retentionTime == null) { - LOGGER.warn("get topic retentionTime failed, clusterId:{} topicName:{}.", - clusterDO.getId(), topicName); + Properties properties = KafkaZookeeperUtils.getTopicProperties(zkConfig, topicName); + if (ValidateUtils.isNull(properties)) { + LOGGER.warn("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName); continue; } - PhysicalClusterMetadataManager.putTopicRetentionTime(clusterDO.getId(), topicName, retentionTime); + PhysicalClusterMetadataManager.putTopicProperties(clusterDO.getId(), topicName, properties); } catch (Exception e) { - LOGGER.error("get topic retentionTime failed, clusterId:{} topicName:{}.", - clusterDO.getId(), topicName, e); + LOGGER.error("get topic properties failed, clusterId:{} topicName:{}.", clusterDO.getId(), topicName, e); } } } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index aaac290f..bb42cadd 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -61,10 +61,7 @@ public class NormalTopicController { @ApiOperation(value = "Topic基本信息", notes = "") @RequestMapping(value = "{clusterId}/topics/{topicName}/basic-info", method = RequestMethod.GET) @ResponseBody - public Result getTopicBasic( - @PathVariable Long clusterId, - @PathVariable String topicName, - @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) { + public Result getTopicBasic(@PathVariable Long clusterId, @PathVariable String topicName, @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId) { Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); if (ValidateUtils.isNull(physicalClusterId)) { return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java index 4e28ca8b..c7364cb5 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/converters/TopicModelConverter.java @@ -31,6 +31,7 @@ public class TopicModelConverter { vo.setReplicaNum(dto.getReplicaNum()); vo.setPrincipals(dto.getPrincipals()); vo.setRetentionTime(dto.getRetentionTime()); + vo.setRetentionBytes(dto.getRetentionBytes()); vo.setCreateTime(dto.getCreateTime()); vo.setModifyTime(dto.getModifyTime()); vo.setScore(dto.getScore()); -- GitLab