diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java index 2bc874ec3367c378d6687956a9f93e21b731ee04..af69ea50fd9a80f186a5a8c3a68cca57bdc680a2 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OperateEnum.java @@ -46,7 +46,7 @@ public enum OperateEnum { public static boolean validate(Integer code) { if (code == null) { - return false; + return true; } for (OperateEnum state : OperateEnum.values()) { if (state.getCode() == code) { diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java index 1837ecfcde72bbcc6dfb6782df1c7fad369cd4d4..7f1910173a11661ab1d547e230d0878d97a390f3 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/OperateRecordDTO.java @@ -81,11 +81,6 @@ public class OperateRecordDTO { } public boolean legal() { - if (!ModuleEnum.validate(moduleId) || - (!ValidateUtils.isNull(operateId) && OperateEnum.validate(operateId)) - ) { - return false; - } - return true; + return !ValidateUtils.isNull(moduleId) && ModuleEnum.validate(moduleId) && OperateEnum.validate(operateId); } } diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java similarity index 91% rename from kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java rename to kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java index 3665b7acc79473b929d83d2ba6357bab692809aa..c83c24d2d7f5fdf15a3bec4a6acfdc11b068a69e 100644 --- a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicStatisticMetricsVO.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.openapi.common.vo; +package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -14,7 +14,6 @@ public class TopicStatisticMetricsVO { public TopicStatisticMetricsVO(Double peakBytesIn) { this.peakBytesIn = peakBytesIn; - } public Double getPeakBytesIn() { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java index 5b2909caede291077e0d66b0ab9d367b785c0ff3..f70548de45e9f2d2cf2263efeeeca618279c0ddb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java @@ -17,5 +17,5 @@ public interface OperateRecordService { int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content); - List queryByCondt(OperateRecordDTO dto); + List queryByCondition(OperateRecordDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java index cfa2920f4764ff2cf635fd2fa609238f6e6b4a19..8dc0e0c1ad90526dce4e958880bf31af14852d50 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerService.java @@ -32,6 +32,15 @@ public interface TopicManagerService { Map> getTopicMaxAvgBytesIn(Long clusterId, Integer latestDay, Double minMaxAvgBytesIn); + /** + * 获取指定时间范围内Topic的峰值均值流量 + * @param clusterId 集群ID + * @param topicName Topic名称 + * @param startTime 开始时间 + * @param endTime 结束时间 + * @param maxAvgDay 最大几天的均值 + * @return + */ Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay); TopicStatisticsDO getByTopicAndDay(Long clusterId, String topicName, String gmtDay); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java index 26d7ef4debafbf5a53c8632e34361cd5d440f3fb..8a0028c7b1eaffc76ea3b76b6a33600717d38b7b 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java @@ -66,7 +66,10 @@ public class AdminServiceImpl implements AdminService { String applicant, String operator) { List fullBrokerIdList = regionService.getFullBrokerIdList(clusterDO.getId(), regionId, brokerIdList); - if (PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList) > DEFAULT_DEAD_BROKER_LIMIT_NUM) { + + Long notAliveBrokerNum = PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList); + if (notAliveBrokerNum >= fullBrokerIdList.size() || notAliveBrokerNum > DEFAULT_DEAD_BROKER_LIMIT_NUM) { + // broker全挂了,或者是挂的数量大于了DEFAULT_DEAD_BROKER_LIMIT_NUM时, 则认为broker参数不合法 return ResultStatus.BROKER_NOT_EXIST; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index b505bad01768b9d26330e07b808d5736036ac9b2..ea9d22da60b0a0ea8e50a7d624e7f38faaa79139 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -82,6 +82,7 @@ public class ClusterServiceImpl implements ClusterService { content.put("security properties", clusterDO.getSecurityProperties()); content.put("jmx properties", clusterDO.getJmxProperties()); operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content); + if (clusterDao.insert(clusterDO) <= 0) { LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO); return ResultStatus.MYSQL_ERROR; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java index 290bbae5cc3420dd54faaf8d251e97cd7f4c3e25..e232d97077b7df636e2230727b54d23f5e2025f6 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java @@ -41,8 +41,8 @@ public class OperateRecordServiceImpl implements OperateRecordService { } @Override - public List queryByCondt(OperateRecordDTO dto) { - return operateRecordDao.queryByCondt( + public List queryByCondition(OperateRecordDTO dto) { + return operateRecordDao.queryByCondition( dto.getModuleId(), dto.getOperateId(), dto.getOperator(), diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java index 1d761eb883b3a01000f08d6db1a5ebd602f4eacb..bce5fbe71ebc56559fd2be9fd8fe176e73adcb11 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicManagerServiceImpl.java @@ -147,12 +147,14 @@ public class TopicManagerServiceImpl implements TopicManagerService { } @Override - public Double getTopicMaxAvgBytesIn(Long clusterId, - String topicName, - Date startTime, - Date endTime, - Integer maxAvgDay) { - return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay); + public Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay) { + try { + return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay); + } catch (Exception e) { + LOGGER.error("class=TopicManagerServiceImpl||method=getTopicMaxAvgBytesIn||clusterId={}||topicName={}||startTime={}||endTime={}||maxAvgDay={}||errMsg={}", + clusterId, topicName, startTime, endTime, maxAvgDay, e.getMessage()); + } + return null; } @Override diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java index 059837545525b642ce09c5322abc2be7a52ee077..4bfa69990617b8c0b1d9a5e6824eea9d657b079b 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/OperateRecordDao.java @@ -14,5 +14,5 @@ public interface OperateRecordDao { int insert(OperateRecordDO operateRecordDO); - List queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime); + List queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime); } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java index b08e6b833ab1b51b0dca3bf98e29aed385ed1d2e..20f37fd181379775b4eae4a9057140e712dec6d2 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/OperateRecordDaoImpl.java @@ -30,13 +30,13 @@ public class OperateRecordDaoImpl implements OperateRecordDao { } @Override - public List queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) { + public List queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) { Map params = new HashMap<>(5); params.put("moduleId", moduleId); params.put("operateId", operateId); params.put("operator", operator); params.put("startTime", startTime); params.put("endTime", endTime); - return sqlSession.selectList("OperateRecordDao.queryByCondt", params); + return sqlSession.selectList("OperateRecordDao.queryByCondition", params); } } diff --git a/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml b/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml index b65c3e6f9fa9e3a65e52c69cc2fe8f6a6fbb10a9..db505b6b63cd60db3f156ca907092c57be212fc4 100644 --- a/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/OperateRecordDao.xml @@ -21,7 +21,7 @@ ) - select * from operate_record where diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java index 6e59816b5354a12656c86c9126959ac178489ed5..7b5d97c34e227e24bb6fcfd6d0dc3c65308af8ca 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/normal/NormalTopicController.java @@ -11,11 +11,13 @@ import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO; import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.*; +import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.SpringTool; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxAttributeEnum; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicStatisticMetricsVO; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; @@ -339,4 +341,23 @@ public class NormalTopicController { ); } + @ApiOperation(value = "Topic流量统计信息", notes = "") + @RequestMapping(value = "{clusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET) + @ResponseBody + public Result getTopicStatisticMetrics(@PathVariable Long clusterId, + @PathVariable String topicName, + @RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId, + @RequestParam("latest-day") Integer latestDay) { + Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId); + if (ValidateUtils.isNull(physicalClusterId)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + Double maxAvgBytesIn = topicManagerService.getTopicMaxAvgBytesIn(physicalClusterId, topicName, new Date(DateUtils.getDayStarTime(-1 * latestDay)), new Date(), 1); + if (ValidateUtils.isNull(maxAvgBytesIn)) { + return Result.buildFrom(ResultStatus.MYSQL_ERROR); + } + return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn)); + } + } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java index 68068f974fb0b45502e495a2a6fa0d84614626a6..f600aab5418fcbfdcb2eebbffaf9f065ebd342a6 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdOperateRecordController.java @@ -36,7 +36,7 @@ public class RdOperateRecordController { if (ValidateUtils.isNull(dto) || !dto.legal()) { return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); } - List voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondt(dto)); + List voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondition(dto)); if (voList.size() > MAX_RECORD_COUNT) { voList = voList.subList(0, MAX_RECORD_COUNT); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java index 4d029fb6b8004a93b40e9e01cec86e1f835fc853..b247cdb8cc9ea3cea664da9f00e425ff42ce83aa 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -13,8 +13,6 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorize import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO; import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; -import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicStatisticMetricsVO; -import com.xiaojukeji.kafka.manager.common.utils.DateUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; @@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.util.Date; import java.util.List; /** @@ -69,27 +66,6 @@ public class ThirdPartTopicController { return new Result<>(vo); } - @ApiOperation(value = "Topic流量统计信息", notes = "") - @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET) - @ResponseBody - public Result getTopicStatisticMetrics(@PathVariable Long physicalClusterId, - @PathVariable String topicName, - @RequestParam("latest-day") Integer latestDay) { - try { - return new Result<>(new TopicStatisticMetricsVO(topicManagerService.getTopicMaxAvgBytesIn( - physicalClusterId, - topicName, - new Date(DateUtils.getDayStarTime(-1 * latestDay)), - new Date(), - 1 - ))); - } catch (Exception e) { - LOGGER.error("get topic statistic metrics failed, clusterId:{} topicName:{} latestDay:{}." - , physicalClusterId, topicName, latestDay, e); - } - return Result.buildFrom(ResultStatus.MYSQL_ERROR); - } - @ApiOperation(value = "Topic是否有流量", notes = "") @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/offset-changed", method = RequestMethod.GET) @ResponseBody