diff --git a/docs/manual_kafka_op/add_cluster.pdf b/docs/manual_kafka_op/add_cluster.pdf new file mode 100644 index 0000000000000000000000000000000000000000..ba7e13e5a6abc1d0c71e9ce7953270e5c0e7abb1 Binary files /dev/null and b/docs/manual_kafka_op/add_cluster.pdf differ diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..74d2d8ab46bb6b1b02e16878b3acb3da74065525 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ConsumeHealthEnum.java @@ -0,0 +1,46 @@ +package com.xiaojukeji.kafka.manager.common.bizenum; + +/** + * 消费健康 + * @author zengqiao + * @date 20/5/22 + */ +public enum ConsumeHealthEnum { + UNKNOWN(-1, "unknown"), + HEALTH(0, "health"), + UNHEALTH(1, "unhealth"), + ; + + private Integer code; + + private String message; + + ConsumeHealthEnum(Integer code, String message) { + this.code = code; + this.message = message; + } + + public Integer getCode() { + return code; + } + + public void setCode(Integer code) { + this.code = code; + } + + public String getMessage() { + return message; + } + + public void setMessage(String message) { + this.message = message; + } + + @Override + public String toString() { + return "ConsumeHealthEnum{" + + "code=" + code + + ", message='" + message + '\'' + + '}'; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java new file mode 100644 index 0000000000000000000000000000000000000000..b69a8a25bd7e53e94e5afc71d842c860442a5e0e --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/OffsetResetTypeEnum.java @@ -0,0 +1,21 @@ +package com.xiaojukeji.kafka.manager.common.bizenum; + +/** + * @author zengqiao + * @date 20/10/26 + */ +public enum OffsetResetTypeEnum { + RESET_BY_TIME(0), + + RESET_BY_OFFSET(1); + + private final Integer code; + + OffsetResetTypeEnum(Integer code) { + this.code = code; + } + + public Integer getCode() { + return code; + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java index 5cd1ab5b5d80f1a13d7459ff5ed662f3304fb496..3690514f3a16f04b99996c8e991440d8fd5ace5b 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ApiPrefix.java @@ -6,21 +6,20 @@ package com.xiaojukeji.kafka.manager.common.constant; * @date 20/4/16 */ public class ApiPrefix { - public static final String API_V1_SSO_PREFIX = "/api/v1/sso/"; - - public static final String API_V1_NORMAL_PREFIX = "/api/v1/normal/"; - - public static final String API_V1_RD_PREFIX = "/api/v1/rd/"; - - public static final String API_V1_OP_PREFIX = "/api/v1/op/"; - - public static final String API_V1_THIRD_PART_PREFIX = "/api/v1/third-part/"; - - public static final String API_V2_THIRD_PART_PREFIX = "/api/v2/third-part/"; - - public static final String API_V1_OBSOLETE_PREFIX = "/api/v1/"; - - public static final String API_V2_OBSOLETE_PREFIX = "/api/v2/"; - - public static final String GATEWAY_API_V1_PREFIX = "/gateway/api/v1/"; + public static final String API_PREFIX = "/api/"; + public static final String API_V1_PREFIX = API_PREFIX + "v1/"; + public static final String API_V2_PREFIX = API_PREFIX + "v2/"; + + // console + public static final String API_V1_SSO_PREFIX = API_V1_PREFIX + "sso/"; + public static final String API_V1_NORMAL_PREFIX = API_V1_PREFIX + "normal/"; + public static final String API_V1_RD_PREFIX = API_V1_PREFIX + "rd/"; + public static final String API_V1_OP_PREFIX = API_V1_PREFIX + "op/"; + + // open + public static final String API_V1_THIRD_PART_PREFIX = API_V1_PREFIX + "third-part/"; + public static final String API_V2_THIRD_PART_PREFIX = API_V2_PREFIX + "third-part/"; + + // gateway + public static final String GATEWAY_API_V1_PREFIX = "/gateway" + API_V1_PREFIX; } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java index 5fc76f03bd13631f37df65b1b3ef1a396a9ff9ff..c3162a4b6b19a72d1f494dc8ac9b76c38f737152 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/SystemCodeConstant.java @@ -5,13 +5,5 @@ package com.xiaojukeji.kafka.manager.common.constant; * @date 20/7/28 */ public class SystemCodeConstant { - public static final String LOG_X = "LogX"; - - public static final String LEO = "leo"; - - public static final String DATA_DREAM = "datadream"; - public static final String KAFKA_MANAGER = "kafka-manager"; - - public static final String CHORUS = "chorus"; // 治理平台-服务治理 } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/DeprecatedResponseResult.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/DeprecatedResponseResult.java deleted file mode 100644 index 08b127216051bae66a8036ccaafd594a4328f576..0000000000000000000000000000000000000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/DeprecatedResponseResult.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity; - -/** - * @author zengqiao - * @date 20/7/27 - */ -public class DeprecatedResponseResult { - public static final String SUCCESS_STATUS = "success"; - - public static final String FAILED_STATUS = "failure"; - - public static final String SUCCESS_MESSAGE = "process succeeded!"; - - public static final String FAILED_MESSAGE = "process failed!"; - - private String status; - - private String message; - - private T data; - - public static DeprecatedResponseResult success(T data) { - DeprecatedResponseResult responseCommonResult = new DeprecatedResponseResult(); - responseCommonResult.setMessage(SUCCESS_MESSAGE); - responseCommonResult.setStatus(SUCCESS_STATUS); - responseCommonResult.setData(data); - return responseCommonResult; - } - - public static DeprecatedResponseResult success() { - DeprecatedResponseResult responseCommonResult = new DeprecatedResponseResult(); - responseCommonResult.setStatus(SUCCESS_STATUS); - responseCommonResult.setMessage(SUCCESS_MESSAGE); - return responseCommonResult; - } - - public static DeprecatedResponseResult failure() { - DeprecatedResponseResult responseCommonResult = new DeprecatedResponseResult(); - responseCommonResult.setMessage(FAILED_MESSAGE); - responseCommonResult.setStatus(FAILED_STATUS); - return responseCommonResult; - } - - public static DeprecatedResponseResult failure(String message) { - DeprecatedResponseResult responseCommonResult = new DeprecatedResponseResult(); - responseCommonResult.setMessage(message); - responseCommonResult.setStatus(FAILED_STATUS); - return responseCommonResult; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public String getMessage() { - return message; - } - - public void setMessage(String message) { - this.message = message; - } - - public T getData() { - return data; - } - - public void setData(T data) { - this.data = data; - } - - @Override - public String toString() { - return "DeprecatedResponseResult{" + - "status='" + status + '\'' + - ", message='" + message + '\'' + - ", data=" + data + - '}'; - } -} \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java index 7fc3343f5436894120449df3c3f888759d758b31..a3e8b655c30a09594eed1f6f1759f25ca3ab657c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/Result.java @@ -88,6 +88,22 @@ public class Result implements Serializable { return result; } + public static Result buildSuc(T data) { + Result result = new Result(); + result.setCode(ResultStatus.SUCCESS.getCode()); + result.setMessage(ResultStatus.SUCCESS.getMessage()); + result.setData(data); + return result; + } + + public static Result buildFailure(String message) { + Result result = new Result(); + result.setCode(ResultStatus.GATEWAY_INVALID_REQUEST.getCode()); + result.setMessage(message); + result.setData(null); + return result; + } + public static Result buildFrom(ResultStatus resultStatus) { Result result = new Result(); result.setCode(resultStatus.getCode()); diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java index b63151acb3caa1b4a65194e67935d05531d61ac6..ce044a139d0eb02447c33dda0605a06db3e7b932 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ResultStatus.java @@ -8,7 +8,10 @@ import com.xiaojukeji.kafka.manager.common.constant.Constant; * @date 20/4/16 */ public enum ResultStatus { + GATEWAY_INVALID_REQUEST(-1, "invalid request"), + SUCCESS(Constant.SUCCESS, "success"), + LOGIN_FAILED(1, "login failed, please check username and password"), diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/KafkaBootstrapServerConfig.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/KafkaBootstrapServerConfig.java index 4b00c893f1abc5ee32a46327999ba75b274997dd..e61181c537d485bc5d2f5f6da84f4c8a735fbaf6 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/KafkaBootstrapServerConfig.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/gateway/KafkaBootstrapServerConfig.java @@ -8,18 +8,18 @@ import java.util.Map; * @date 20/7/29 */ public class KafkaBootstrapServerConfig extends BaseGatewayConfig { - private Map> clusterIdBootstrapServersMap; + private Map> clusterIdBootstrapServersMap; - public KafkaBootstrapServerConfig(Long version, Map> clusterIdBootstrapServersMap) { + public KafkaBootstrapServerConfig(Long version, Map> clusterIdBootstrapServersMap) { this.version = version; this.clusterIdBootstrapServersMap = clusterIdBootstrapServersMap; } - public Map> getClusterIdBootstrapServersMap() { + public Map> getClusterIdBootstrapServersMap() { return clusterIdBootstrapServersMap; } - public void setClusterIdBootstrapServersMap(Map> clusterIdBootstrapServersMap) { + public void setClusterIdBootstrapServersMap(Map> clusterIdBootstrapServersMap) { this.clusterIdBootstrapServersMap = clusterIdBootstrapServersMap; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/OperationHistoryDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/OperationHistoryDO.java deleted file mode 100644 index e513b371220c4bf39a6102ede913685b865fcb91..0000000000000000000000000000000000000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/OperationHistoryDO.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.pojo; - -import java.util.Date; - -/** - * @author zengqiao - * @date 20/4/29 - */ -public class OperationHistoryDO { - private Long id; - - private Date gmtCreate; - - private Long clusterId; - - private String topicName; - - private String operator; - - private String operation; - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public Date getGmtCreate() { - return gmtCreate; - } - - public void setGmtCreate(Date gmtCreate) { - this.gmtCreate = gmtCreate; - } - - public Long getClusterId() { - return clusterId; - } - - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getOperator() { - return operator; - } - - public void setOperator(String operator) { - this.operator = operator; - } - - public String getOperation() { - return operation; - } - - public void setOperation(String operation) { - this.operation = operation; - } - - @Override - public String toString() { - return "OperationHistoryDO{" + - "id=" + id + - ", gmtCreate=" + gmtCreate + - ", clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", operator='" + operator + '\'' + - ", operation='" + operation + '\'' + - '}'; - } - - public static OperationHistoryDO newInstance(Long clusterId, String topicName, String operator, String operation) { - OperationHistoryDO operationHistoryDO = new OperationHistoryDO(); - operationHistoryDO.setClusterId(clusterId); - operationHistoryDO.setTopicName(topicName); - operationHistoryDO.setOperator(operator); - operationHistoryDO.setOperation(operation); - return operationHistoryDO; - } -} \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/app/AppVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/app/AppVO.java index 41ca8d92740b2393093d459b92c1c069905cd535..dab97dd71d7c3c0a34f80a135cc0b99c0c6adb00 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/app/AppVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/app/AppVO.java @@ -18,6 +18,9 @@ public class AppVO { @ApiModelProperty(value="App密码") private String password; + @ApiModelProperty(value="申请人") + private String applicant; + @ApiModelProperty(value="App描述") private String description; @@ -48,6 +51,14 @@ public class AppVO { this.password = password; } + public String getApplicant() { + return applicant; + } + + public void setApplicant(String applicant) { + this.applicant = applicant; + } + public String getDescription() { return description; } @@ -70,6 +81,7 @@ public class AppVO { "appId='" + appId + '\'' + ", name='" + name + '\'' + ", password='" + password + '\'' + + ", applicant='" + applicant + '\'' + ", description='" + description + '\'' + ", principals='" + principals + '\'' + '}'; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/thirdpart/AppBasicInfoVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/thirdpart/AppBasicInfoVO.java deleted file mode 100644 index dd43df2ebdcdf5d2deec33dd40afd2b6016fb4cb..0000000000000000000000000000000000000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/thirdpart/AppBasicInfoVO.java +++ /dev/null @@ -1,90 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.vo.thirdpart; - -import io.swagger.annotations.ApiModel; -import io.swagger.annotations.ApiModelProperty; - -/** - * @author zhongyuankai - * @date 2020/6/18 - */ -@Deprecated -@ApiModel(description="AppID基本信息") -public class AppBasicInfoVO { - @ApiModelProperty(value="appId") - private String appId; - - @ApiModelProperty(value="app密码") - private String password; - - @ApiModelProperty(value="app名称") - private String name; - - @ApiModelProperty(value="申请人") - private String applicant; - - @ApiModelProperty(value="appId负责人") - private String principal; - - @ApiModelProperty(value="描述信息") - private String description; - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getPassword() { - return password; - } - - public void setPassword(String password) { - this.password = password; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public String getApplicant() { - return applicant; - } - - public void setApplicant(String applicant) { - this.applicant = applicant; - } - - public String getPrincipal() { - return principal; - } - - public void setPrincipal(String principal) { - this.principal = principal; - } - - public String getDescription() { - return description; - } - - public void setDescription(String description) { - this.description = description; - } - - @Override - public String toString() { - return "AppBasicInfoVO{" + - "appId='" + appId + '\'' + - ", password='" + password + '\'' + - ", name='" + name + '\'' + - ", applicant='" + applicant + '\'' + - ", principal='" + principal + '\'' + - ", description='" + description + '\'' + - '}'; - } -} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HttpUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HttpUtils.java index c15a25b44aafa558aeb3cd926e866a110a0f4aab..2a0d1eac60b6336a7d542c37b94b09e19d3fa4c8 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HttpUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HttpUtils.java @@ -35,6 +35,7 @@ public class HttpUtils { private static final String METHOD_GET = "GET"; private static final String METHOD_POST = "POST"; private static final String METHOD_PUT = "PUT"; + private static final String METHOD_DELETE = "DELETE"; private static final String CHARSET_UTF8 = "UTF-8"; @@ -119,6 +120,18 @@ public class HttpUtils { return sendRequest(url, METHOD_PUT, null, headers, in); } + public static String deleteForString(String url, String content, Map headers) { + InputStream in = null; + try { + if (content != null && !content.isEmpty()) { + in = new ByteArrayInputStream(content.getBytes(CHARSET_UTF8)); + } + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + return sendRequest(url, METHOD_DELETE, null, headers, in); + } + /** * @param url 请求的链接, 只支持 http 和 https 链接 * @param method GET or POST diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java index 83665791b04f86871a6c8162da5927f0aac34871..ed8a639e21fb45431e3b2b694a7848bb367a327c 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/jmx/JmxConnectorWrap.java @@ -28,10 +28,13 @@ public class JmxConnectorWrap { private AtomicInteger atomicInteger; - public JmxConnectorWrap(String host, int port) { + public JmxConnectorWrap(String host, int port, int maxConn) { this.host = host; this.port = port; - this.atomicInteger = new AtomicInteger(25); + if (maxConn <= 0) { + maxConn = 1; + } + this.atomicInteger = new AtomicInteger(maxConn); } public boolean checkJmxConnectionAndInitIfNeed() { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index a56b0ab378216161828ed37351f74a2748cc442f..a37b95e275fc794c7c73f625fb46e1b3ef23b91a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -14,6 +14,7 @@ import com.xiaojukeji.kafka.manager.common.zookeeper.ZkConfigImpl; import com.xiaojukeji.kafka.manager.dao.ControllerDao; import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConnectorWrap; import com.xiaojukeji.kafka.manager.service.service.JmxService; +import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import com.xiaojukeji.kafka.manager.service.zookeeper.*; import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.common.zookeeper.ZkPathUtil; @@ -44,6 +45,9 @@ public class PhysicalClusterMetadataManager { @Autowired private ClusterService clusterService; + @Autowired + private ConfigUtils configUtils; + private final static Map CLUSTER_MAP = new ConcurrentHashMap<>(); private final static Map CONTROLLER_DATA_MAP = new ConcurrentHashMap<>(); @@ -89,7 +93,7 @@ public class PhysicalClusterMetadataManager { BROKER_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); JMX_CONNECTOR_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); KAFKA_VERSION_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>()); - BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig); + BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn()); brokerListener.init(); zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener); @@ -255,7 +259,7 @@ public class PhysicalClusterMetadataManager { //---------------------------Broker元信息相关-------------- - public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata) { + public static void putBrokerMetadata(Long clusterId, Integer brokerId, BrokerMetadata brokerMetadata, Integer jmxMaxConn) { Map metadataMap = BROKER_METADATA_MAP.get(clusterId); if (metadataMap == null) { return; @@ -263,7 +267,7 @@ public class PhysicalClusterMetadataManager { metadataMap.put(brokerId, brokerMetadata); Map jmxMap = JMX_CONNECTOR_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>()); - jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort())); + jmxMap.put(brokerId, new JmxConnectorWrap(brokerMetadata.getHost(), brokerMetadata.getJmxPort(), jmxMaxConn)); JMX_CONNECTOR_MAP.put(clusterId, jmxMap); Map versionMap = KAFKA_VERSION_MAP.getOrDefault(clusterId, new ConcurrentHashMap<>()); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java index ecda75e9542d7e1ed80888a141c7815e3b9a1e6a..8dfb26c4c4053d4f2208e434e5d550356925a0cb 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/GatewayConfigServiceImpl.java @@ -38,10 +38,10 @@ public class GatewayConfigServiceImpl implements GatewayConfigService { } Long maxVersion = Long.MIN_VALUE; - Map> clusterIdBootstrapServersMap = new HashMap<>(doList.size()); + Map> clusterIdBootstrapServersMap = new HashMap<>(doList.size()); for (GatewayConfigDO configDO: doList) { clusterIdBootstrapServersMap.put( - Long.valueOf(configDO.getName()), + configDO.getName().trim(), ListUtils.string2StrList(configDO.getValue()) ); if (configDO.getVersion().compareTo(maxVersion) > 0) { diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java index 8616d689b2c5e2f92165d41ee91a9593994b1676..5ffc472938e09df5aac1595da935994bde55837f 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConfigServiceImpl.java @@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; -import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant; import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.config.*; @@ -11,7 +10,6 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO; import com.xiaojukeji.kafka.manager.dao.ConfigDao; -import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.ConfigService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -166,11 +164,6 @@ public class ConfigServiceImpl implements ConfigService { @Override public CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode) { String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY; - if (SystemCodeConstant.LOG_X.equals(systemCode)) { - configKey = TopicCreationConstant.LOG_X_CREATE_TOPIC_CONFIG_KEY_NAME; - } else if (SystemCodeConstant.CHORUS.equals(systemCode)) { - configKey = TopicCreationConstant.CHORUS_CREATE_TOPIC_CONFIG_KEY_NAME; - } CreateTopicConfig configValue = this.getByKey( configKey, CreateTopicConfig.class diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java index edba7af0175938a836e746853fe1df4683c0e424..7f90527579610872523076206cfde9292eed9158 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ConsumerServiceImpl.java @@ -390,7 +390,7 @@ public class ConsumerServiceImpl implements ConsumerService { @Override public boolean checkConsumerGroupExist(OffsetLocationEnum offsetLocation, Long clusterId, String topicName, String consumerGroup) { List consumerGroupList = getConsumerGroupList(clusterId, topicName).stream() - .filter(group -> offsetLocation.location.equals(group.getOffsetStoreLocation()) && consumerGroup.equals(group.getConsumerGroup())) + .filter(group -> offsetLocation.location.equals(group.getOffsetStoreLocation().location) && consumerGroup.equals(group.getConsumerGroup())) .collect(Collectors.toList()); return !ValidateUtils.isEmptyList(consumerGroupList); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java index 8b9e0132861f8ec23858432ddce676e4a51ed66d..d0b34e3d64b8e7b50bec86f97f9117231b3c9763 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ExpertServiceImpl.java @@ -186,7 +186,7 @@ public class ExpertServiceImpl implements ExpertService { continue; } Integer suggestedPartitionNum = (int) Math.round( - bytesIn / topicMetadata.getPartitionNum() / config.getMaxBytesInPerPartitionUnitB() + bytesIn / config.getMaxBytesInPerPartitionUnitB() ); if (suggestedPartitionNum - topicMetadata.getPartitionNum() < 1) { continue; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java index b805d3cab1c91f97385c82e8955c7766f0f60088..2440e78d725e6e638e74e1772f30dbe79dc6ef72 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ConfigUtils.java @@ -13,6 +13,9 @@ public class ConfigUtils { @Value(value = "${custom.idc}") private String idc; + @Value("${custom.jmx.max-conn}") + private Integer jmxMaxConn; + @Value(value = "${spring.profiles.active}") private String kafkaManagerEnv; @@ -24,6 +27,14 @@ public class ConfigUtils { this.idc = idc; } + public Integer getJmxMaxConn() { + return jmxMaxConn; + } + + public void setJmxMaxConn(Integer jmxMaxConn) { + this.jmxMaxConn = jmxMaxConn; + } + public String getKafkaManagerEnv() { return kafkaManagerEnv; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java index 494e7542a51f40d2948ae7aef40950a62d5e8c13..16a185e048f85ada79b7e06252392b5cb6862157 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/zookeeper/BrokerStateListener.java @@ -22,9 +22,12 @@ public class BrokerStateListener implements StateChangeListener { private ZkConfigImpl zkConfig; - public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig) { + private Integer jmxMaxConn; + + public BrokerStateListener(Long clusterId, ZkConfigImpl zkConfig, Integer jmxMaxConn) { this.clusterId = clusterId; this.zkConfig = zkConfig; + this.jmxMaxConn = jmxMaxConn; } @Override @@ -81,7 +84,7 @@ public class BrokerStateListener implements StateChangeListener { } brokerMetadata.setClusterId(clusterId); brokerMetadata.setBrokerId(brokerId); - PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata); + PhysicalClusterMetadataManager.putBrokerMetadata(clusterId, brokerId, brokerMetadata, jmxMaxConn); } catch (Exception e) { LOGGER.error("add broker failed, clusterId:{} brokerMetadata:{}.", clusterId, brokerMetadata, e); } diff --git a/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml b/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml index e5b1747be7e01c27f1ce7d6533771eab2a620103..a03eb6e024c2221cca2dae9ef5602743490b68a4 100644 --- a/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/ClusterDao.xml @@ -38,6 +38,10 @@ SELECT * FROM cluster where id=#{id} + + DELETE FROM cluster where id=#{id} + + diff --git a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/account/BaseEnterpriseStaffService.java b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/account/BaseEnterpriseStaffService.java index c40a97b1001f116bd973ea8d89b57d30aa2c0345..b931eecdf6b7b068424d4a5c5417256d27986db3 100644 --- a/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/account/BaseEnterpriseStaffService.java +++ b/kafka-manager-extends/kafka-manager-account/src/main/java/com/xiaojukeji/kafka/manager/account/component/account/BaseEnterpriseStaffService.java @@ -55,4 +55,4 @@ public class BaseEnterpriseStaffService extends AbstractEnterpriseStaffService { } return new ArrayList<>(); } -} \ No newline at end of file +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java index 160118f3f2e41d0e00ec5d798142a14c490d7cbd..e71175f7a74d338c5adfe3bf12901299da9e8ac4 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java @@ -30,19 +30,19 @@ import java.util.Map; public class N9e extends AbstractAgent { private static final Logger LOGGER = LoggerFactory.getLogger(N9e.class); - @Value("${agent.n9e.base-url}") + @Value("${kcm.n9e.base-url}") private String baseUrl; - @Value("${agent.n9e.username}") + @Value("${kcm.n9e.username}") private String username; - @Value("${agent.n9e.user-token}") + @Value("${kcm.n9e.user-token}") private String userToken; - @Value("${agent.n9e.tpl-id}") + @Value("${kcm.n9e.tpl-id}") private Integer tplId; - @Value("${agent.n9e.timeout}") + @Value("${kcm.n9e.timeout}") private Integer timeout; /** diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java index b4a84eb01be59774a681b17eb4561306b15542c4..51f3828e2e4d15b0a6660de9122e9ff11be6521c 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java @@ -6,7 +6,6 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.common; * @date 20/4/29 */ public enum StorageEnum { - GIFT(0, "gift"), GIT(1, "git"), S3(2, "S3"), ; diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/application-kcm-dev.yml b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/application-kcm-dev.yml deleted file mode 100644 index 8ed8b8468ba8d9a03723524259c7e8869559c0c9..0000000000000000000000000000000000000000 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/application-kcm-dev.yml +++ /dev/null @@ -1,7 +0,0 @@ -agent: - n9e: - base-url: http://127.0.0.1/api - username: admin - user-token: admin - tpl-id: 123456 - timeout: 30 \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java index ea18377c73cbc135a6a2b8a438327dfa579c92c7..d7691a85df7a80404363ccfdfcb9c70089133f73 100644 --- a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eConverter.java @@ -1,9 +1,11 @@ package com.xiaojukeji.kafka.manager.monitor.component.n9e; -import com.xiaojukeji.kafka.manager.monitor.common.entry.MetricSinkPoint; -import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eMetricSinkPoint; +import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.monitor.common.entry.*; +import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** @@ -27,4 +29,127 @@ public class N9eConverter { } return n9ePointList; } + + public static N9eStrategy convert2N9eStrategy(Strategy strategy, Integer monitorN9eNid) { + if (strategy == null) { + return null; + } + + N9eStrategy n9eStrategy = new N9eStrategy(); + n9eStrategy.setId(strategy.getId().intValue()); + n9eStrategy.setCategory(1); + n9eStrategy.setName(strategy.getName()); + n9eStrategy.setNid(monitorN9eNid); + n9eStrategy.setExcl_nid(new ArrayList<>()); + n9eStrategy.setPriority(strategy.getPriority()); + n9eStrategy.setAlert_dur(60); + + List exprs = new ArrayList<>(); + for (StrategyExpression strategyExpression: strategy.getStrategyExpressionList()) { + N9eStrategyExpression n9eStrategyExpression = new N9eStrategyExpression(); + n9eStrategyExpression.setMetric(strategyExpression.getMetric()); + n9eStrategyExpression.setFunc(strategyExpression.getFunc()); + n9eStrategyExpression.setEopt(strategyExpression.getEopt()); + n9eStrategyExpression.setThreshold(strategyExpression.getThreshold().intValue()); + n9eStrategyExpression.setParams(ListUtils.string2IntList(strategyExpression.getParams())); + exprs.add(n9eStrategyExpression); + } + n9eStrategy.setExprs(exprs); + + List tags = new ArrayList<>(); + for (StrategyFilter strategyFilter: strategy.getStrategyFilterList()) { + N9eStrategyFilter n9eStrategyFilter = new N9eStrategyFilter(); + n9eStrategyFilter.setTkey(strategyFilter.getTkey()); + n9eStrategyFilter.setTopt(strategyFilter.getTopt()); + n9eStrategyFilter.setTval(Arrays.asList(strategyFilter.getTval())); + tags.add(n9eStrategyFilter); + } + n9eStrategy.setTags(tags); + + n9eStrategy.setRecovery_dur(0); + n9eStrategy.setRecovery_notify(0); + + StrategyAction strategyAction = strategy.getStrategyActionList().get(0); + n9eStrategy.setConverge(ListUtils.string2IntList(strategyAction.getConverge())); + n9eStrategy.setNotify_group(ListUtils.string2StrList(strategyAction.getNotifyGroup())); + n9eStrategy.setNotify_user(new ArrayList<>()); + n9eStrategy.setCallback(strategyAction.getCallback()); + n9eStrategy.setEnable_stime("00:00"); + n9eStrategy.setEnable_etime("23:59"); + n9eStrategy.setEnable_days_of_week(ListUtils.string2IntList(strategy.getPeriodDaysOfWeek())); + + n9eStrategy.setNeed_upgrade(0); + n9eStrategy.setAlert_upgrade(new ArrayList<>()); + return n9eStrategy; + } + + public static List convert2StrategyList(List n9eStrategyList) { + if (n9eStrategyList == null || n9eStrategyList.isEmpty()) { + return new ArrayList<>(); + } + + List strategyList = new ArrayList<>(); + for (N9eStrategy n9eStrategy: n9eStrategyList) { + strategyList.add(convert2Strategy(n9eStrategy)); + } + return strategyList; + } + + public static Strategy convert2Strategy(N9eStrategy n9eStrategy) { + if (n9eStrategy == null) { + return null; + } + Strategy strategy = new Strategy(); + strategy.setId(n9eStrategy.getId().longValue()); + strategy.setName(n9eStrategy.getName()); + strategy.setPriority(n9eStrategy.getPriority()); + strategy.setPeriodHoursOfDay("0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23"); + strategy.setPeriodDaysOfWeek(ListUtils.intList2String(n9eStrategy.getEnable_days_of_week())); + + List strategyExpressionList = new ArrayList<>(); + for (N9eStrategyExpression n9eStrategyExpression: n9eStrategy.getExprs()) { + StrategyExpression strategyExpression = new StrategyExpression(); + strategyExpression.setMetric(n9eStrategyExpression.getMetric()); + strategyExpression.setFunc(n9eStrategyExpression.getFunc()); + strategyExpression.setEopt(n9eStrategyExpression.getEopt()); + strategyExpression.setThreshold(n9eStrategyExpression.getThreshold().longValue()); + strategyExpression.setParams(ListUtils.intList2String(n9eStrategyExpression.getParams())); + strategyExpressionList.add(strategyExpression); + } + strategy.setStrategyExpressionList(strategyExpressionList); + + List strategyFilterList = new ArrayList<>(); + for (N9eStrategyFilter n9eStrategyFilter: n9eStrategy.getTags()) { + StrategyFilter strategyFilter = new StrategyFilter(); + strategyFilter.setTkey(n9eStrategyFilter.getTkey()); + strategyFilter.setTopt(n9eStrategyFilter.getTopt()); + strategyFilter.setTval(ListUtils.strList2String(n9eStrategyFilter.getTval())); + strategyFilterList.add(strategyFilter); + } + strategy.setStrategyFilterList(strategyFilterList); + + StrategyAction strategyAction = new StrategyAction(); + strategyAction.setNotifyGroup(ListUtils.strList2String(n9eStrategy.getNotify_group())); + strategyAction.setConverge(ListUtils.intList2String(n9eStrategy.getConverge())); + strategyAction.setCallback(n9eStrategy.getCallback()); + strategy.setStrategyActionList(Arrays.asList(strategyAction)); + + return strategy; + } + + public static List convert2NotifyGroupList(N9eNotifyGroup n9eNotifyGroup) { + if (n9eNotifyGroup == null || n9eNotifyGroup.getList() == null) { + return new ArrayList<>(); + } + + List notifyGroupList = new ArrayList<>(); + for (N9eNotifyGroupElem n9eNotifyGroupElem: n9eNotifyGroup.getList()) { + NotifyGroup notifyGroup = new NotifyGroup(); + notifyGroup.setId(n9eNotifyGroupElem.getId().longValue()); + notifyGroup.setName(n9eNotifyGroupElem.getName()); + notifyGroup.setComment(n9eNotifyGroupElem.getNote()); + notifyGroupList.add(notifyGroup); + } + return notifyGroupList; + } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java index fce7ea456476700afa9c47561621850425f74574..df2e430c675c8b3269a4581a8cf82940004cc773 100644 --- a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java @@ -2,17 +2,18 @@ package com.xiaojukeji.kafka.manager.monitor.component.n9e; import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.utils.HttpUtils; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.monitor.component.AbstractMonitorService; import com.xiaojukeji.kafka.manager.monitor.common.entry.*; +import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eNotifyGroup; import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eResult; +import com.xiaojukeji.kafka.manager.monitor.component.n9e.entry.N9eStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Properties; +import java.util.*; /** * 夜莺 @@ -23,21 +24,28 @@ import java.util.Properties; public class N9eService extends AbstractMonitorService { private static final Logger LOGGER = LoggerFactory.getLogger(N9eService.class); + @Value("${monitor.n9e.nid}") + private Integer monitorN9eNid; + + @Value("${monitor.n9e.user-token}") + private String monitorN9eToken; + @Value("${monitor.n9e.base-url}") private String monitorN9eBaseUrl; /** * 告警策略 */ - private static final String STRATEGY_ADD_URL = "/auth/v1/strategy/add"; + private static final String STRATEGY_ADD_URL = "/api/mon/stra"; - private static final String STRATEGY_DEL_URL = "/auth/v1/strategy/del"; + private static final String STRATEGY_DEL_URL = "/api/mon/stra"; - private static final String STRATEGY_MODIFY_URL = "/auth/v1/strategy/modify"; + private static final String STRATEGY_MODIFY_URL = "/api/mon/stra"; - private static final String STRATEGY_QUERY_BY_NS_URL = "/auth/v1/strategy/query/ns"; + private static final String STRATEGY_QUERY_BY_NS_URL = "/api/mon/stra"; + + private static final String STRATEGY_QUERY_BY_ID_URL = "/api/mon/stra"; - private static final String STRATEGY_QUERY_BY_ID_URL = "/auth/v1/strategy/query/id"; private static final String ALERT_QUERY_BY_NS_AND_PERIOD_URL = "/auth/v1/event/query/ns/period"; @@ -57,41 +65,121 @@ public class N9eService extends AbstractMonitorService { /** * 指标数据 */ - private static final String COLLECTOR_SINK_DATA_URL = "/api/collector/push"; + private static final String COLLECTOR_SINK_DATA_URL = "/api/transfer/push"; private static final String COLLECTOR_DOWNLOAD_DATA_URL = "/data/query/graph/dashboard/history"; /** * 告警组 */ - private static final String ALL_NOTIFY_GROUP_URL = "/auth/v1/usergroup/group/all"; + private static final String ALL_NOTIFY_GROUP_URL = "/api/mon/teams/all"; /** * 监控策略的增删改查 */ @Override public Integer createStrategy(Strategy strategy) { - return 0; + String response = null; + try { + response = HttpUtils.postForString( + monitorN9eBaseUrl + STRATEGY_ADD_URL, + JSON.toJSONString(N9eConverter.convert2N9eStrategy(strategy, monitorN9eNid)), + buildHeader() + ); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("create strategy failed, strategy:{} response:{}.", strategy, response); + return null; + } + return (Integer) n9eResult.getDat(); + } catch (Exception e) { + LOGGER.error("create strategy failed, strategy:{} response:{}.", strategy, response, e); + } + return null; } @Override public Boolean deleteStrategyById(Long strategyId) { - return true; + Map> params = new HashMap<>(1); + params.put("ids", Arrays.asList(strategyId)); + + String response = null; + try { + response = HttpUtils.deleteForString( + monitorN9eBaseUrl + STRATEGY_DEL_URL, + JSON.toJSONString(params), + buildHeader() + ); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("delete strategy failed, strategyId:{} response:{}.", strategyId, response); + return Boolean.FALSE; + } + return Boolean.TRUE; + } catch (Exception e) { + LOGGER.error("delete strategy failed, strategyId:{} response:{}.", strategyId, response, e); + } + return Boolean.FALSE; } @Override public Boolean modifyStrategy(Strategy strategy) { - return true; + String response = null; + try { + response = HttpUtils.putForString( + monitorN9eBaseUrl + STRATEGY_MODIFY_URL, + JSON.toJSONString(N9eConverter.convert2N9eStrategy(strategy, monitorN9eNid)), + buildHeader() + ); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("modify strategy failed, strategy:{} response:{}.", strategy, response); + return Boolean.FALSE; + } + return Boolean.TRUE; + } catch (Exception e) { + LOGGER.error("modify strategy failed, strategy:{} response:{}.", strategy, response, e); + } + return Boolean.FALSE; } @Override public List getStrategies() { + Map params = new HashMap<>(); + params.put("nid", String.valueOf(monitorN9eNid)); + + String response = null; + try { + response = HttpUtils.get(monitorN9eBaseUrl + STRATEGY_QUERY_BY_NS_URL, params, buildHeader()); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("get monitor strategies failed, response:{}.", response); + return new ArrayList<>(); + } + return N9eConverter.convert2StrategyList(JSON.parseArray(JSON.toJSONString(n9eResult.getDat()), N9eStrategy.class)); + } catch (Exception e) { + LOGGER.error("get monitor strategies failed, response:{}.", response, e); + } return new ArrayList<>(); } @Override public Strategy getStrategyById(Long strategyId) { - return new Strategy(); + String uri = STRATEGY_QUERY_BY_ID_URL + "/" + String.valueOf(strategyId); + + String response = null; + try { + response = HttpUtils.get(monitorN9eBaseUrl + uri, new HashMap<>(0), buildHeader()); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("get monitor strategy failed, response:{}.", response); + return null; + } + return N9eConverter.convert2Strategy(JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eStrategy.class)); + } catch (Exception e) { + LOGGER.error("get monitor strategy failed, response:{}.", response, e); + } + return null; } @Override @@ -161,6 +249,26 @@ public class N9eService extends AbstractMonitorService { @Override public List getNotifyGroups() { + String response = null; + try { + response = HttpUtils.get(monitorN9eBaseUrl + ALL_NOTIFY_GROUP_URL, new HashMap<>(0), buildHeader()); + N9eResult n9eResult = JSON.parseObject(response, N9eResult.class); + if (!ValidateUtils.isBlank(n9eResult.getErr())) { + LOGGER.error("get notify group failed, response:{}.", response); + return new ArrayList<>(); + } + return N9eConverter.convert2NotifyGroupList(JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eNotifyGroup.class)); + } catch (Exception e) { + LOGGER.error("get notify group failed, response:{}.", response, e); + } return new ArrayList<>(); } -} \ No newline at end of file + + private Map buildHeader() { + Map header = new HashMap<>(2); + header.put("Content-Type", "application/json"); + header.put("X-User-Token", monitorN9eToken); + return header; + } + +} diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroup.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroup.java new file mode 100644 index 0000000000000000000000000000000000000000..9e1061e0a685e036c47cd1df4dfc54df71015bc1 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroup.java @@ -0,0 +1,26 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/19 + */ +public class N9eNotifyGroup { + private List list; + + public List getList() { + return list; + } + + public void setList(List list) { + this.list = list; + } + + @Override + public String toString() { + return "N9eNotifyGroup{" + + "list=" + list + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroupElem.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroupElem.java new file mode 100644 index 0000000000000000000000000000000000000000..9bd246c61eb34c240da2f8f898ced04becffe67d --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eNotifyGroupElem.java @@ -0,0 +1,90 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +/** + * @author zengqiao + * @date 20/10/19 + */ +public class N9eNotifyGroupElem { + private Integer creator; + + private Integer id; + + private String ident; + + private String last_updated; + + private Integer mgmt; + + private String name; + + private String note; + + public Integer getCreator() { + return creator; + } + + public void setCreator(Integer creator) { + this.creator = creator; + } + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public String getIdent() { + return ident; + } + + public void setIdent(String ident) { + this.ident = ident; + } + + public String getLast_updated() { + return last_updated; + } + + public void setLast_updated(String last_updated) { + this.last_updated = last_updated; + } + + public Integer getMgmt() { + return mgmt; + } + + public void setMgmt(Integer mgmt) { + this.mgmt = mgmt; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getNote() { + return note; + } + + public void setNote(String note) { + this.note = note; + } + + @Override + public String toString() { + return "N9eNotifyGroupElem{" + + "creator=" + creator + + ", id=" + id + + ", ident='" + ident + '\'' + + ", last_updated='" + last_updated + '\'' + + ", mgmt=" + mgmt + + ", name='" + name + '\'' + + ", note='" + note + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategy.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategy.java new file mode 100644 index 0000000000000000000000000000000000000000..420b825cf2c21167c3b3b344517f331b15adf1ac --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategy.java @@ -0,0 +1,242 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/18 + */ +public class N9eStrategy { + private Integer id; + + private Integer category = 1; + + /** + * 策略名称 + */ + private String name; + + /** + * 策略关联的对象树节点id + */ + private Integer nid; + + private List excl_nid = new ArrayList<>(); + + private Integer priority; + + private Integer alert_dur = 60; + + private List exprs; + + private List tags; + + private Integer recovery_dur; + + private Integer recovery_notify; + + private List alert_upgrade = new ArrayList<>(); + + private List converge; + + private List notify_group; + + private List notify_user; + + private String callback; + + private String enable_stime; + + private String enable_etime; + + private List enable_days_of_week; + + private Integer need_upgrade; + + public Integer getId() { + return id; + } + + public void setId(Integer id) { + this.id = id; + } + + public Integer getCategory() { + return category; + } + + public void setCategory(Integer category) { + this.category = category; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getNid() { + return nid; + } + + public void setNid(Integer nid) { + this.nid = nid; + } + + public List getExcl_nid() { + return excl_nid; + } + + public void setExcl_nid(List excl_nid) { + this.excl_nid = excl_nid; + } + + public Integer getPriority() { + return priority; + } + + public void setPriority(Integer priority) { + this.priority = priority; + } + + public Integer getAlert_dur() { + return alert_dur; + } + + public void setAlert_dur(Integer alert_dur) { + this.alert_dur = alert_dur; + } + + public List getExprs() { + return exprs; + } + + public void setExprs(List exprs) { + this.exprs = exprs; + } + + public List getTags() { + return tags; + } + + public void setTags(List tags) { + this.tags = tags; + } + + public Integer getRecovery_dur() { + return recovery_dur; + } + + public void setRecovery_dur(Integer recovery_dur) { + this.recovery_dur = recovery_dur; + } + + public Integer getRecovery_notify() { + return recovery_notify; + } + + public void setRecovery_notify(Integer recovery_notify) { + this.recovery_notify = recovery_notify; + } + + public List getAlert_upgrade() { + return alert_upgrade; + } + + public void setAlert_upgrade(List alert_upgrade) { + this.alert_upgrade = alert_upgrade; + } + + public List getConverge() { + return converge; + } + + public void setConverge(List converge) { + this.converge = converge; + } + + public List getNotify_group() { + return notify_group; + } + + public void setNotify_group(List notify_group) { + this.notify_group = notify_group; + } + + public List getNotify_user() { + return notify_user; + } + + public void setNotify_user(List notify_user) { + this.notify_user = notify_user; + } + + public String getCallback() { + return callback; + } + + public void setCallback(String callback) { + this.callback = callback; + } + + public String getEnable_stime() { + return enable_stime; + } + + public void setEnable_stime(String enable_stime) { + this.enable_stime = enable_stime; + } + + public String getEnable_etime() { + return enable_etime; + } + + public void setEnable_etime(String enable_etime) { + this.enable_etime = enable_etime; + } + + public List getEnable_days_of_week() { + return enable_days_of_week; + } + + public void setEnable_days_of_week(List enable_days_of_week) { + this.enable_days_of_week = enable_days_of_week; + } + + public Integer getNeed_upgrade() { + return need_upgrade; + } + + public void setNeed_upgrade(Integer need_upgrade) { + this.need_upgrade = need_upgrade; + } + + @Override + public String toString() { + return "N9eStrategy{" + + "id=" + id + + ", category=" + category + + ", name='" + name + '\'' + + ", nid=" + nid + + ", excl_nid=" + excl_nid + + ", priority=" + priority + + ", alert_dur=" + alert_dur + + ", exprs=" + exprs + + ", tags=" + tags + + ", recovery_dur=" + recovery_dur + + ", recovery_notify=" + recovery_notify + + ", alert_upgrade=" + alert_upgrade + + ", converge=" + converge + + ", notify_group=" + notify_group + + ", notify_user=" + notify_user + + ", callback='" + callback + '\'' + + ", enable_stime='" + enable_stime + '\'' + + ", enable_etime='" + enable_etime + '\'' + + ", enable_days_of_week=" + enable_days_of_week + + ", need_upgrade=" + need_upgrade + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyAlertUpgrade.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyAlertUpgrade.java new file mode 100644 index 0000000000000000000000000000000000000000..594c4762c7e53d87d05eba3aa0e52c84a1735d22 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyAlertUpgrade.java @@ -0,0 +1,59 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/19 + */ +public class N9eStrategyAlertUpgrade { + private Integer duration; + + private Integer level; + + private List users; + + private List groups; + + public Integer getDuration() { + return duration; + } + + public void setDuration(Integer duration) { + this.duration = duration; + } + + public Integer getLevel() { + return level; + } + + public void setLevel(Integer level) { + this.level = level; + } + + public List getUsers() { + return users; + } + + public void setUsers(List users) { + this.users = users; + } + + public List getGroups() { + return groups; + } + + public void setGroups(List groups) { + this.groups = groups; + } + + @Override + public String toString() { + return "N9eStrategyAlertUpgrade{" + + "duration=" + duration + + ", level=" + level + + ", users=" + users + + ", groups=" + groups + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyExpression.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyExpression.java new file mode 100644 index 0000000000000000000000000000000000000000..c72f002a52ef1fcdaca2eb4600b71c59d247c124 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyExpression.java @@ -0,0 +1,70 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/18 + */ +public class N9eStrategyExpression { + private String metric; + + private String func; + + private String eopt; + + private Integer threshold; + + private List params; + + public String getMetric() { + return metric; + } + + public void setMetric(String metric) { + this.metric = metric; + } + + public String getFunc() { + return func; + } + + public void setFunc(String func) { + this.func = func; + } + + public String getEopt() { + return eopt; + } + + public void setEopt(String eopt) { + this.eopt = eopt; + } + + public Integer getThreshold() { + return threshold; + } + + public void setThreshold(Integer threshold) { + this.threshold = threshold; + } + + public List getParams() { + return params; + } + + public void setParams(List params) { + this.params = params; + } + + @Override + public String toString() { + return "N9eStrategyExpression{" + + "metric='" + metric + '\'' + + ", func='" + func + '\'' + + ", eopt='" + eopt + '\'' + + ", threshold=" + threshold + + ", params=" + params + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyFilter.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyFilter.java new file mode 100644 index 0000000000000000000000000000000000000000..6ee26f375fd27a533d14c4ac31d2a547bbbc3c56 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/entry/N9eStrategyFilter.java @@ -0,0 +1,48 @@ +package com.xiaojukeji.kafka.manager.monitor.component.n9e.entry; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/18 + */ +public class N9eStrategyFilter { + private String topt; + + private String tkey; + + private List tval; + + public String getTopt() { + return topt; + } + + public void setTopt(String topt) { + this.topt = topt; + } + + public String getTkey() { + return tkey; + } + + public void setTkey(String tkey) { + this.tkey = tkey; + } + + public List getTval() { + return tval; + } + + public void setTval(List tval) { + this.tval = tval; + } + + @Override + public String toString() { + return "N9eStrategyFilter{" + + "topt='" + topt + '\'' + + ", tkey='" + tkey + '\'' + + ", tval=" + tval + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/resources/application-monitor-dev.yml b/kafka-manager-extends/kafka-manager-monitor/src/main/resources/application-monitor-dev.yml deleted file mode 100644 index c5815ba1754cb1e7b967ed6ba2410f80a7cad027..0000000000000000000000000000000000000000 --- a/kafka-manager-extends/kafka-manager-monitor/src/main/resources/application-monitor-dev.yml +++ /dev/null @@ -1,3 +0,0 @@ -monitor: - n9e: - base-url: http://127.0.0.1/api diff --git a/kafka-manager-extends/kafka-manager-notify/src/main/java/com/xiaojukeji/kafka/manager/notify/notifyer/KafkaNotifierService.java b/kafka-manager-extends/kafka-manager-notify/src/main/java/com/xiaojukeji/kafka/manager/notify/notifyer/KafkaNotifierService.java index 5ed7c73fc23550f23a8e0601f525af70550eeae4..73867a4fafb55da75b28b0a10e9888cff94ac772 100644 --- a/kafka-manager-extends/kafka-manager-notify/src/main/java/com/xiaojukeji/kafka/manager/notify/notifyer/KafkaNotifierService.java +++ b/kafka-manager-extends/kafka-manager-notify/src/main/java/com/xiaojukeji/kafka/manager/notify/notifyer/KafkaNotifierService.java @@ -10,10 +10,10 @@ import org.springframework.stereotype.Service; */ @Service("notifyService") public class KafkaNotifierService extends AbstractNotifyService { - @Value("${kafka.cluster-id:}") + @Value("${notify.kafka.cluster-id:}") private Long clusterId; - @Value("${notify.topic-name:}") + @Value("${notify.kafka.topic-name:}") private String topicName; @Override diff --git a/kafka-manager-extends/kafka-manager-notify/src/main/resources/application-notify-dev.yml b/kafka-manager-extends/kafka-manager-notify/src/main/resources/application-notify-dev.yml deleted file mode 100644 index 1c047583d6ccf5be4a790f8961fe267b93d45525..0000000000000000000000000000000000000000 --- a/kafka-manager-extends/kafka-manager-notify/src/main/resources/application-notify-dev.yml +++ /dev/null @@ -1,8 +0,0 @@ -notify: - order: - detail-url: http://127.0.0.1 - -kafka: - cluster-id: 12 - topic-name: 123 - diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartService.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartService.java new file mode 100644 index 0000000000000000000000000000000000000000..4d320d3a09471e0d733957b14ab48a83d0bddf58 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartService.java @@ -0,0 +1,20 @@ +package com.xiaojukeji.kafka.manager.openapi; + +import com.xiaojukeji.kafka.manager.common.bizenum.ConsumeHealthEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.openapi.common.dto.*; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/5/22 + */ +public interface ThirdPartService { + Result checkConsumeHealth(Long clusterId, + String topicName, + String consumerGroup, + Long maxDelayTime); + + List resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto);} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartUtils.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartUtils.java new file mode 100644 index 0000000000000000000000000000000000000000..f8143a4bf77b8481102027e10be7f5018b8e36c4 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/ThirdPartUtils.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.kafka.manager.openapi; + +import com.xiaojukeji.kafka.manager.bpm.common.OrderTypeEnum; + +/** + * @author zhongyuankai + * @date 2020/08/31 + */ +public class ThirdPartUtils { + + public static String getOrderLimitKey(OrderTypeEnum orderTypeEnum, String systemCode) { + return orderTypeEnum.getOrderName() + "_" + systemCode; + } + +} diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/constant/ThirdPartConstant.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/constant/ThirdPartConstant.java new file mode 100644 index 0000000000000000000000000000000000000000..76c2b811b67ab69c213bfbfe7ace8b489a43943c --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/constant/ThirdPartConstant.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.openapi.common.constant; + +import java.util.Arrays; +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/26 + */ +public class ThirdPartConstant { + public final static List QUOTA_MODIFY_WHITE_CLUSTER_LIST = Arrays.asList(70L, 46L); + + public final static Integer DATA_DREAM_MAX_APP_NUM = 20; + + public final static Integer DATA_DREAM_MAX_AUTHORITY_NUM = 500; + + public final static String SELF_SYSTEM_CODE = "kafkamanager"; +} diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/ConsumeHealthDTO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/ConsumeHealthDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..fb8c1bb3c44673d6345a29b638e0a83aacc9a409 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/ConsumeHealthDTO.java @@ -0,0 +1,83 @@ +package com.xiaojukeji.kafka.manager.openapi.common.dto; + +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/6/2 + */ +@ApiModel(description = "消费健康") +public class ConsumeHealthDTO { + @ApiModelProperty(value = "集群ID") + private Long clusterId; + + @ApiModelProperty(value = "Topic名称") + private List topicNameList; + + @ApiModelProperty(value = "消费组") + private String consumerGroup; + + @ApiModelProperty(value = "允许最大延迟(ms)") + private Long maxDelayTime; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public List getTopicNameList() { + return topicNameList; + } + + public void setTopicNameList(List topicNameList) { + this.topicNameList = topicNameList; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public Long getMaxDelayTime() { + return maxDelayTime; + } + + public void setMaxDelayTime(Long maxDelayTime) { + this.maxDelayTime = maxDelayTime; + } + + @Override + public String toString() { + return "ConsumeHealthDTO{" + + "clusterId=" + clusterId + + ", topicNameList=" + topicNameList + + ", consumerGroup='" + consumerGroup + '\'' + + ", maxDelayTime=" + maxDelayTime + + '}'; + } + + public boolean paramLegal() { + if (ValidateUtils.isNull(clusterId) + || ValidateUtils.isEmptyList(topicNameList) + || ValidateUtils.isBlank(consumerGroup) + || ValidateUtils.isNullOrLessThanZero(maxDelayTime)) { + return false; + } + for (String topicName: topicNameList) { + if (ValidateUtils.isExistBlank(topicName)) { + return false; + } + } + return true; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/OffsetResetDTO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/OffsetResetDTO.java new file mode 100644 index 0000000000000000000000000000000000000000..60c0aa39e8c997520943cecce600dac9beb0081b --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/dto/OffsetResetDTO.java @@ -0,0 +1,208 @@ +package com.xiaojukeji.kafka.manager.openapi.common.dto; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.xiaojukeji.kafka.manager.common.bizenum.OffsetResetTypeEnum; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import org.springframework.util.StringUtils; + +import java.util.List; + +@JsonIgnoreProperties(ignoreUnknown = true) +@ApiModel(description = "重置消费偏移") +public class OffsetResetDTO { + @ApiModelProperty(value = "集群ID") + private Long clusterId; + + @ApiModelProperty(value = "Topic名称") + private String topicName; + + @ApiModelProperty(value = "消费组") + private String consumerGroup; + + @ApiModelProperty(value = "消费组位置") + private String location; + + @ApiModelProperty(value = "重置的方式[0:依据时间进行重置, 1:指定分区offset进行重置]") + private Integer offsetResetType; + + @ApiModelProperty(value = "依据时间进行重置时, 传的参数, 13位时间戳") + private Long timestamp; + + @ApiModelProperty(value = "指定分区进行重置时, 传的参数") + private List partitionOffsetDTOList; + + @ApiModelProperty(value = "如果消费组不存在则创建") + private Boolean createIfAbsent = Boolean.FALSE; + + @ApiModelProperty(value = "使用的AppID") + private String appId; + + @ApiModelProperty(value = "App密码") + private String password; + + @ApiModelProperty(value = "操作人") + private String operator; + + @ApiModelProperty(value = "系统code") + private String systemCode; + + /** + * 默认使用assign的方式进行重置, + * 但是使用assign方式对于多个Topic的消费使用同一个消费组的场景, 需要停掉所有的client才可以重置成功, 否则重置失败 + * + * 使用subscribe重置offset, 针对上面的场景可以重置成功, 但是涉及到poll函数调用, 所以默认是关闭的 + */ + private Boolean subscribeReset = Boolean.FALSE; // 订阅重置, 默认是assign方式重置 + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public String getTopicName() { + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + } + + public String getConsumerGroup() { + return consumerGroup; + } + + public void setConsumerGroup(String consumerGroup) { + this.consumerGroup = consumerGroup; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public Integer getOffsetResetType() { + return offsetResetType; + } + + public void setOffsetResetType(Integer offsetResetType) { + this.offsetResetType = offsetResetType; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } + + public List getPartitionOffsetDTOList() { + return partitionOffsetDTOList; + } + + public void setPartitionOffsetDTOList(List partitionOffsetDTOList) { + this.partitionOffsetDTOList = partitionOffsetDTOList; + } + + public Boolean getCreateIfAbsent() { + return createIfAbsent; + } + + public void setCreateIfAbsent(Boolean createIfAbsent) { + this.createIfAbsent = createIfAbsent; + } + + public String getAppId() { + return appId; + } + + public void setAppId(String appId) { + this.appId = appId; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public String getOperator() { + return operator; + } + + public void setOperator(String operator) { + this.operator = operator; + } + + public String getSystemCode() { + return systemCode; + } + + public void setSystemCode(String systemCode) { + this.systemCode = systemCode; + } + + public Boolean getSubscribeReset() { + return subscribeReset; + } + + public void setSubscribeReset(Boolean subscribeReset) { + this.subscribeReset = subscribeReset; + } + + @Override + public String toString() { + return "OffsetResetModel{" + + "clusterId=" + clusterId + + ", topicName='" + topicName + '\'' + + ", consumerGroup='" + consumerGroup + '\'' + + ", location='" + location + '\'' + + ", offsetResetType=" + offsetResetType + + ", timestamp=" + timestamp + + ", partitionOffsetDTOList=" + partitionOffsetDTOList + + ", createIfAbsent=" + createIfAbsent + + ", appId='" + appId + '\'' + + ", password='" + password + '\'' + + ", operator='" + operator + '\'' + + ", systemCode='" + systemCode + '\'' + + ", subscribeReset=" + subscribeReset + + '}'; + } + + public boolean legal() { + if (clusterId == null + || StringUtils.isEmpty(topicName) + || StringUtils.isEmpty(consumerGroup) + || StringUtils.isEmpty(location) + || offsetResetType == null + || StringUtils.isEmpty(operator)) { + return false; + } + appId = (appId == null? "": appId); + password = (password == null? "": password); + if (createIfAbsent == null) { + createIfAbsent = false; + } + if (subscribeReset == null) { + subscribeReset = false; + } + + // 只能依据时间或者offset中的一个进行重置 + if (OffsetResetTypeEnum.RESET_BY_TIME.getCode().equals(offsetResetType)) { + return timestamp != null; + } else if (OffsetResetTypeEnum.RESET_BY_OFFSET.getCode().equals(offsetResetType)) { + return partitionOffsetDTOList != null; + } + return false; + } +} diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/BrokerRegionVO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/BrokerRegionVO.java new file mode 100644 index 0000000000000000000000000000000000000000..40cc2a7b5f9faab2d8bbf2df604fdc08a52085e1 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/BrokerRegionVO.java @@ -0,0 +1,57 @@ +package com.xiaojukeji.kafka.manager.openapi.common.vo; + +/** + * @author zengqiao + * @date 20/9/14 + */ +public class BrokerRegionVO { + private Long clusterId; + + private Integer brokerId; + + private String hostname; + + private String regionName; + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public Integer getBrokerId() { + return brokerId; + } + + public void setBrokerId(Integer brokerId) { + this.brokerId = brokerId; + } + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public String getRegionName() { + return regionName; + } + + public void setRegionName(String regionName) { + this.regionName = regionName; + } + + @Override + public String toString() { + return "BrokerRegionVO{" + + "clusterId=" + clusterId + + ", brokerId=" + brokerId + + ", hostname='" + hostname + '\'' + + ", regionName='" + regionName + '\'' + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ConsumeHealthVO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ConsumeHealthVO.java new file mode 100644 index 0000000000000000000000000000000000000000..3e36f8b188e36731490e682147f6fbb9feb4bb96 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ConsumeHealthVO.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.kafka.manager.openapi.common.vo; + +/** + * @author zengqiao + * @date 20/10/26 + */ +public class ConsumeHealthVO { + private Integer healthCode; + + public ConsumeHealthVO(Integer healthCode) { + this.healthCode = healthCode; + } + + public Integer getHealthCode() { + return healthCode; + } + + public void setHealthCode(Integer healthCode) { + this.healthCode = healthCode; + } + + @Override + public String toString() { + return "ConsumeHealthVO{" + + "healthCode=" + healthCode + + '}'; + } +} diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ThirdPartBrokerOverviewVO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ThirdPartBrokerOverviewVO.java new file mode 100644 index 0000000000000000000000000000000000000000..595e6878657118ef133af7b67cdaa95a1e8344dc --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/ThirdPartBrokerOverviewVO.java @@ -0,0 +1,59 @@ +package com.xiaojukeji.kafka.manager.openapi.common.vo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * @author zengqiao + * @date 20/9/9 + */ +@ApiModel(description="第三方-Broker概览") +public class ThirdPartBrokerOverviewVO { + @ApiModelProperty(value = "集群ID") + private Long clusterId; + + @ApiModelProperty(value = "BrokerId") + private Integer brokerId; + + @ApiModelProperty(value = "处于同步状态 false:已同步, true:未同步") + private Boolean underReplicated; + + public ThirdPartBrokerOverviewVO(Long clusterId, Integer brokerId, Boolean underReplicated) { + this.clusterId = clusterId; + this.brokerId = brokerId; + this.underReplicated = underReplicated; + } + + public Long getClusterId() { + return clusterId; + } + + public void setClusterId(Long clusterId) { + this.clusterId = clusterId; + } + + public Integer getBrokerId() { + return brokerId; + } + + public void setBrokerId(Integer brokerId) { + this.brokerId = brokerId; + } + + public Boolean getUnderReplicated() { + return underReplicated; + } + + public void setUnderReplicated(Boolean underReplicated) { + this.underReplicated = underReplicated; + } + + @Override + public String toString() { + return "ThirdPartBrokerOverviewVO{" + + "clusterId=" + clusterId + + ", brokerId=" + brokerId + + ", underReplicated=" + underReplicated + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicOffsetChangedVO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicOffsetChangedVO.java new file mode 100644 index 0000000000000000000000000000000000000000..3238691f45035328d3ab8b3db46e833eb2992365 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicOffsetChangedVO.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.kafka.manager.openapi.common.vo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * @author zengqiao + * @date 20/8/24 + */ +@ApiModel(description="TopicOffset变化") +public class TopicOffsetChangedVO { + @ApiModelProperty(value="Offset是否变化, 0:否, 1:是, -1:未知") + private Integer offsetChanged; + + public TopicOffsetChangedVO(Integer offsetChanged) { + this.offsetChanged = offsetChanged; + } + + public Integer getOffsetChanged() { + return offsetChanged; + } + + public void setOffsetChanged(Integer offsetChanged) { + this.offsetChanged = offsetChanged; + } + + @Override + public String toString() { + return "TopicOffsetChangedVO{" + + "offsetChanged=" + offsetChanged + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java new file mode 100644 index 0000000000000000000000000000000000000000..3665b7acc79473b929d83d2ba6357bab692809aa --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/common/vo/TopicStatisticMetricsVO.java @@ -0,0 +1,34 @@ +package com.xiaojukeji.kafka.manager.openapi.common.vo; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; + +/** + * @author zengqiao + * @date 20/8/14 + */ +@ApiModel(description="Topic流量统计信息") +public class TopicStatisticMetricsVO { + @ApiModelProperty(value="峰值流入流量(B/s)") + private Double peakBytesIn; + + public TopicStatisticMetricsVO(Double peakBytesIn) { + this.peakBytesIn = peakBytesIn; + + } + + public Double getPeakBytesIn() { + return peakBytesIn; + } + + public void setPeakBytesIn(Double peakBytesIn) { + this.peakBytesIn = peakBytesIn; + } + + @Override + public String toString() { + return "TopicStatisticMetricsVO{" + + "peakBytesIn=" + peakBytesIn + + '}'; + } +} \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..10e0bfdfd677d4341e0587c5511479ec5f7b1df9 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-openapi/src/main/java/com/xiaojukeji/kafka/manager/openapi/impl/ThirdPartServiceImpl.java @@ -0,0 +1,200 @@ +package com.xiaojukeji.kafka.manager.openapi.impl; + +import com.xiaojukeji.kafka.manager.common.bizenum.*; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.openapi.ThirdPartService; +import com.xiaojukeji.kafka.manager.openapi.common.dto.*; +import com.xiaojukeji.kafka.manager.service.cache.KafkaClientPool; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.*; +import kafka.admin.AdminClient; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import scala.collection.JavaConversions; + +import java.util.*; + +/** + * @author zengqiao + * @date 20/5/22 + */ +@Service("thirdPartService") +public class ThirdPartServiceImpl implements ThirdPartService { + private static Logger LOGGER = LoggerFactory.getLogger(ThirdPartServiceImpl.class); + + @Autowired + private ClusterService clusterService; + + @Autowired + private TopicService topicService; + + @Autowired + private ConsumerService consumerService; + + @Override + public Result checkConsumeHealth(Long clusterId, + String topicName, + String consumerGroup, + Long maxDelayTime) { + ClusterDO clusterDO = clusterService.getById(clusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName); + if (ValidateUtils.isNull(topicMetadata)) { + return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST); + } + + // 获取消费组当前的offset + Map consumeOffsetMap = listGroupOffsets(clusterId, consumerGroup); + if (ValidateUtils.isNull(consumeOffsetMap)) { + return new Result<>(ConsumeHealthEnum.UNKNOWN); + } + if (consumeOffsetMap.isEmpty()) { + return Result.buildFrom(ResultStatus.CONSUMER_GROUP_NOT_EXIST); + } + + Long delayTimestamp = System.currentTimeMillis() - maxDelayTime; + + // 获取指定时间的offset + Map offsetAndTimeMap = + offsetsForTimes(clusterDO, topicMetadata, delayTimestamp); + if (ValidateUtils.isNull(offsetAndTimeMap)) { + return new Result<>(ConsumeHealthEnum.UNKNOWN); + } + + for (TopicPartition tp : offsetAndTimeMap.keySet()) { + OffsetAndTimestamp offsetAndTimestamp = offsetAndTimeMap.get(tp); + Long consumeOffset = (Long) consumeOffsetMap.get(tp); + if (ValidateUtils.isNull(consumeOffset)) { + return new Result<>(ConsumeHealthEnum.UNKNOWN); + } + + if (offsetAndTimestamp.offset() <= consumeOffset) { + // 健康的 + continue; + } + + return new Result<>(ConsumeHealthEnum.UNHEALTH); + } + return new Result<>(ConsumeHealthEnum.HEALTH); + } + + private Map listGroupOffsets(Long clusterId, String consumerGroup) { + AdminClient client = KafkaClientPool.getAdminClient(clusterId); + if (ValidateUtils.isNull(client)) { + return null; + } + try { + return JavaConversions.asJavaMap(client.listGroupOffsets(consumerGroup)); + } catch (Exception e) { + LOGGER.error("list group offsets failed, clusterId:{}, consumerGroup:{}.", clusterId, consumerGroup, e); + } + return null; + } + + private Map offsetsForTimes(ClusterDO clusterDO, + TopicMetadata topicMetadata, + Long timestamp) { + KafkaConsumer kafkaConsumer = null; + try { + kafkaConsumer = KafkaClientPool.borrowKafkaConsumerClient(clusterDO); + if (ValidateUtils.isNull(kafkaConsumer)) { + return null; + } + Map timestampsToSearch = new HashMap<>(); + for (Integer partitionId : topicMetadata.getPartitionMap().getPartitions().keySet()) { + timestampsToSearch.put(new TopicPartition(topicMetadata.getTopic(), partitionId), timestamp); + } + return kafkaConsumer.offsetsForTimes(timestampsToSearch); + } catch (Exception e) { + LOGGER.error("get offset for time failed, clusterDO:{} topicMetadata:{} timestamp:{}.", + clusterDO, topicMetadata, timestamp, e); + } finally { + KafkaClientPool.returnKafkaConsumerClient(clusterDO.getId(), kafkaConsumer); + } + return null; + } + + @Override + public List resetOffsets(ClusterDO clusterDO, OffsetResetDTO dto) { + if (ValidateUtils.isNull(dto)) { + return null; + } + List offsetDTOList = dto.getPartitionOffsetDTOList(); + if (ValidateUtils.isEmptyList(offsetDTOList)) { + offsetDTOList = topicService.getPartitionOffsetList( + clusterDO, dto.getTopicName(), dto.getTimestamp()); + } + if (ValidateUtils.isEmptyList(offsetDTOList)) { + return null; + } + OffsetLocationEnum offsetLocation = dto.getLocation().equals( + OffsetLocationEnum.ZOOKEEPER.location) ? OffsetLocationEnum.ZOOKEEPER : OffsetLocationEnum.BROKER; + ResultStatus result = checkConsumerGroupExist(clusterDO, dto.getTopicName(), dto.getConsumerGroup(), offsetLocation, dto.getCreateIfAbsent()); + if (ResultStatus.SUCCESS.getCode() != result.getCode()) { + return null; + } + ConsumerGroupDTO consumerGroupDTO = new ConsumerGroupDTO( + clusterDO.getId(), + dto.getConsumerGroup(), + new ArrayList<>(), + OffsetLocationEnum.getOffsetStoreLocation(dto.getLocation()) + ); + return consumerService.resetConsumerOffset( + clusterDO, + dto.getTopicName(), + consumerGroupDTO, + offsetDTOList + ); + } + + private ResultStatus checkConsumerGroupExist(ClusterDO clusterDO, + String topicName, + String consumerGroup, + OffsetLocationEnum offsetLocation, + Boolean createIfAbsent) { + if (createIfAbsent) { + // 如果不存在, 则直接创建 + return isCreateIfAbsentOverflow(clusterDO, topicName); + } + if (!consumerService.checkConsumerGroupExist(offsetLocation, clusterDO.getId(), topicName, consumerGroup)) { + return ResultStatus.PARAM_ILLEGAL; + } + return ResultStatus.SUCCESS; + + } + + /** + * 限制单天单集群的重置次数不能超过20个 + * + */ + private static final Map createIfAbsentCountMap = new HashMap<>(); + + private synchronized ResultStatus isCreateIfAbsentOverflow(ClusterDO clusterDO, String topicName) { + String key = clusterDO.getId() + "_" + topicName; + Long timestampAndCount = createIfAbsentCountMap.get(key); + if (ValidateUtils.isNull(timestampAndCount) || + (System.currentTimeMillis() - (timestampAndCount / 100) >= (24 *60 * 60 * 1000))) { + // 24小时卫触发, 统计归0 + timestampAndCount = System.currentTimeMillis() * 100L + 1; + } else if (timestampAndCount % 100 > 20) { + return ResultStatus.OPERATION_FORBIDDEN; + } else { + timestampAndCount += 1; + } + createIfAbsentCountMap.put(key, timestampAndCount); + return ResultStatus.SUCCESS; + } +} \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java index 0e4eb6ab11398fcd26945284ee8795a4fb08e2d8..6a37edd2a8bfa07e7b7bb65ca3e9fdb4106a5cb6 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/metrics/delete/DeleteMetrics.java @@ -1,7 +1,6 @@ package com.xiaojukeji.kafka.manager.task.dispatch.metrics.delete; import com.xiaojukeji.kafka.manager.common.constant.LogConstant; -import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; import com.xiaojukeji.kafka.manager.dao.*; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask; diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/MainApplication.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/MainApplication.java index 334daf277c9cef639253dbb8d532e7b16c659ba4..106d15f551654da89239e8d9bd2383fb7e08d3fb 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/MainApplication.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/MainApplication.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.web; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -18,10 +20,13 @@ import org.springframework.scheduling.annotation.EnableScheduling; @EnableAutoConfiguration @SpringBootApplication(scanBasePackages = {"com.xiaojukeji.kafka.manager"}) public class MainApplication { + private static final Logger LOGGER = LoggerFactory.getLogger(MainApplication.class); + public static void main(String[] args) { try { SpringApplication sa = new SpringApplication(MainApplication.class); sa.run(args); + LOGGER.info("MainApplication started"); } catch (Exception e) { e.printStackTrace(); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java index d2f7fa9c02748bfe6e4387869a71989126a1bd8c..3008a9ea5678c7e6683f612e8394cfb008ba6dc8 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java @@ -3,7 +3,7 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway; import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; -import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult; +import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; @@ -34,19 +34,19 @@ public class GatewayHeartbeatController { @ApiOperation(value = "连接信息上报入口", notes = "Broker主动上报信息") @RequestMapping(value = "heartbeat/survive-user", method = RequestMethod.POST) @ResponseBody - public DeprecatedResponseResult receiveTopicConnections(@RequestParam("clusterId") String clusterId, - @RequestParam("brokerId") String brokerId, - @RequestBody List dtoList) { + public Result receiveTopicConnections(@RequestParam("clusterId") String clusterId, + @RequestParam("brokerId") String brokerId, + @RequestBody List dtoList) { try { if (ValidateUtils.isEmptyList(dtoList)) { - return DeprecatedResponseResult.success("success"); + return Result.buildSuc(); } topicConnectionService.batchAdd(dtoList); - return DeprecatedResponseResult.success("success"); + return Result.buildSuc(); } catch (Exception e) { LOGGER.error("receive topic connections failed, clusterId:{} brokerId:{} req:{}", clusterId, brokerId, JSON.toJSONString(dtoList), e); } - return DeprecatedResponseResult.failure("fail"); + return Result.buildFailure("fail"); } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayReportController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayReportController.java index 3c5a19453d44d5f9cedb9d90d71d529265be1711..4a3947560c3fa69966ba0e36931117e7a5e4ab14 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayReportController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayReportController.java @@ -2,7 +2,7 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; -import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult; +import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.utils.ListUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicReportDO; @@ -35,15 +35,15 @@ public class GatewayReportController { @ApiOperation(value = "查询开启JMX采集的Topic", notes = "") @RequestMapping(value = "report/jmx/topics", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getJmxReportTopics(@RequestParam("clusterId") Long clusterId) { + public Result getJmxReportTopics(@RequestParam("clusterId") Long clusterId) { List doList = topicReportService.getNeedReportTopic(clusterId); - if (ValidateUtils.isEmptyList(doList)) { - return DeprecatedResponseResult.success(); + if (ValidateUtils.isNull(doList)) { + doList = new ArrayList<>(); } List topicNameList = new ArrayList<>(); for (TopicReportDO elem: doList) { topicNameList.add(elem.getTopicName()); } - return DeprecatedResponseResult.success(ListUtils.strList2String(topicNameList)); + return Result.buildSuc(ListUtils.strList2String(topicNameList)); } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewaySecurityController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewaySecurityController.java index 4f6633f31220a46365cd125369804e0e9372aad1..8d3c4740332fa0f5ffa2b950b72ac763f09dbf35 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewaySecurityController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewaySecurityController.java @@ -3,7 +3,8 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway; import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; -import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.KafkaAclSearchDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.KafkaUserSearchDTO; import com.xiaojukeji.kafka.manager.common.entity.vo.gateway.KafkaSecurityVO; @@ -40,9 +41,9 @@ public class GatewaySecurityController { @ApiOperation(value = "Kafka用户查询", notes = "") @RequestMapping(value = "security/users", method = RequestMethod.POST) @ResponseBody - public DeprecatedResponseResult getKafkaUsers(@RequestBody KafkaUserSearchDTO dto) { + public Result getKafkaUsers(@RequestBody KafkaUserSearchDTO dto) { if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { - return DeprecatedResponseResult.failure("invalid request"); + return Result.buildFrom(ResultStatus.GATEWAY_INVALID_REQUEST); } try { @@ -50,16 +51,16 @@ public class GatewaySecurityController { dto.getStart(), dto.getEnd().equals(0L)? System.currentTimeMillis(): dto.getEnd() ); - if (ValidateUtils.isEmptyList(doList)) { - return DeprecatedResponseResult.success(); + if (ValidateUtils.isNull(doList)) { + doList = new ArrayList<>(); } KafkaSecurityVO vo = new KafkaSecurityVO(); vo.setRows(new ArrayList<>(GatewayModelConverter.convert2KafkaUserVOList(doList))); - return DeprecatedResponseResult.success(JSON.toJSONString(vo)); + return Result.buildSuc(JSON.toJSONString(vo)); } catch (Exception e) { LOGGER.error("get kafka users failed, req:{}.", dto, e); - return DeprecatedResponseResult.failure("get kafka users exception"); + return Result.buildFrom(ResultStatus.MYSQL_ERROR); } } @@ -67,9 +68,9 @@ public class GatewaySecurityController { @ApiOperation(value = "Kafka用户权限查询", notes = "") @RequestMapping(value = "security/acls", method = RequestMethod.POST) @ResponseBody - public DeprecatedResponseResult getKafkaAcls(@RequestBody KafkaAclSearchDTO dto) { + public Result getKafkaAcls(@RequestBody KafkaAclSearchDTO dto) { if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { - return DeprecatedResponseResult.failure("invalid request"); + return Result.buildFrom(ResultStatus.GATEWAY_INVALID_REQUEST); } try { @@ -78,16 +79,16 @@ public class GatewaySecurityController { dto.getStart(), dto.getEnd().equals(0L)? System.currentTimeMillis(): dto.getEnd() ); - if (ValidateUtils.isEmptyList(doList)) { - return DeprecatedResponseResult.success(); + if (ValidateUtils.isNull(doList)) { + doList = new ArrayList<>(); } KafkaSecurityVO vo = new KafkaSecurityVO(); vo.setRows(new ArrayList<>(GatewayModelConverter.convert2KafkaAclVOList(doList))); - return DeprecatedResponseResult.success(JSON.toJSONString(vo)); + return Result.buildSuc(JSON.toJSONString(vo)); } catch (Exception e) { LOGGER.error("get kafka acls failed, req:{}.", dto, e); - return DeprecatedResponseResult.failure("get kafka acls exception"); + return Result.buildFrom(ResultStatus.MYSQL_ERROR); } } } \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java index b30b1e6ea368815d43178e7b74d3c51ea667df9f..699b30f002e599ec4d7026b020e7a99d9de0a963 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayServiceDiscoveryController.java @@ -4,7 +4,7 @@ import com.alibaba.fastjson.JSON; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.bizenum.gateway.GatewayConfigKeyEnum; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; -import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult; +import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.*; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.GatewayConfigDO; import com.xiaojukeji.kafka.manager.common.entity.vo.gateway.GatewayConfigVO; @@ -54,33 +54,27 @@ public class GatewayServiceDiscoveryController { @ApiOperation(value = "获取集群服务地址", notes = "") @RequestMapping(value = "discovery/init", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getAllKafkaBootstrapServers() { + public Result getAllKafkaBootstrapServers() { KafkaBootstrapServerConfig config = gatewayConfigService.getKafkaBootstrapServersConfig(Long.MIN_VALUE); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) { - return DeprecatedResponseResult.failure("call init kafka bootstrap servers failed"); + return Result.buildFailure("call init kafka bootstrap servers failed"); } - if (config.getClusterIdBootstrapServersMap().isEmpty()) { - return DeprecatedResponseResult.success(); - } - return DeprecatedResponseResult.success(JSON.toJSONString(config.getClusterIdBootstrapServersMap())); + return Result.buildSuc(JSON.toJSONString(config.getClusterIdBootstrapServersMap())); } @ApiLevel(level = ApiLevelContent.LEVEL_IMPORTANT_2) @ApiOperation(value = "获取集群服务地址", notes = "") @RequestMapping(value = "discovery/update", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getBootstrapServersIfNeeded(@RequestParam("versionNumber") long versionNumber) { + public Result getBootstrapServersIfNeeded(@RequestParam("versionNumber") long versionNumber) { KafkaBootstrapServerConfig config = gatewayConfigService.getKafkaBootstrapServersConfig(versionNumber); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getClusterIdBootstrapServersMap())) { - return DeprecatedResponseResult.failure("call update kafka bootstrap servers failed"); + return Result.buildFailure("call update kafka bootstrap servers failed"); } - if (config.getClusterIdBootstrapServersMap().isEmpty()) { - return DeprecatedResponseResult.success(); - } - return DeprecatedResponseResult.success(JSON.toJSONString(new GatewayConfigVO( + return Result.buildSuc(JSON.toJSONString(new GatewayConfigVO( String.valueOf(config.getVersion()), JSON.toJSONString(config.getClusterIdBootstrapServersMap()) ))); @@ -90,15 +84,13 @@ public class GatewayServiceDiscoveryController { @ApiOperation(value = "最大并发请求数", notes = "") @RequestMapping(value = "discovery/max-request-num", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getMaxRequestNum(@RequestParam("versionNumber") long versionNumber) { + public Result getMaxRequestNum(@RequestParam("versionNumber") long versionNumber) { RequestQueueConfig config = gatewayConfigService.getRequestQueueConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return DeprecatedResponseResult.failure("call get request queue size config failed"); - } - if (ValidateUtils.isNull(config.getMaxRequestQueueSize())) { - return DeprecatedResponseResult.success(); + return Result.buildFailure("call get request queue size config failed"); } - return DeprecatedResponseResult.success(JSON.toJSONString( + + return Result.buildSuc(JSON.toJSONString( new GatewayConfigVO( String.valueOf(config.getVersion()), String.valueOf(config.getMaxRequestQueueSize()) @@ -110,15 +102,13 @@ public class GatewayServiceDiscoveryController { @ApiOperation(value = "最大APP请求速率", notes = "") @RequestMapping(value = "discovery/appId-rate", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getAppIdRate(@RequestParam("versionNumber") long versionNumber) { + public Result getAppIdRate(@RequestParam("versionNumber") long versionNumber) { AppRateConfig config = gatewayConfigService.getAppRateConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return DeprecatedResponseResult.failure("call get app rate config failed"); - } - if (ValidateUtils.isNull(config.getAppRateLimit())) { - return DeprecatedResponseResult.success(); + return Result.buildFailure("call get app rate config failed"); } - return DeprecatedResponseResult.success(JSON.toJSONString( + + return Result.buildSuc(JSON.toJSONString( new GatewayConfigVO( String.valueOf(config.getVersion()), String.valueOf(config.getAppRateLimit()) @@ -130,15 +120,12 @@ public class GatewayServiceDiscoveryController { @ApiOperation(value = "最大IP请求速率", notes = "") @RequestMapping(value = "discovery/ip-rate", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getIpRate(@RequestParam("versionNumber") long versionNumber) { + public Result getIpRate(@RequestParam("versionNumber") long versionNumber) { IpRateConfig config = gatewayConfigService.getIpRateConfig(versionNumber); if (ValidateUtils.isNull(config)) { - return DeprecatedResponseResult.failure("call get ip rate config failed"); - } - if (ValidateUtils.isNull(config.getIpRateLimit())) { - return DeprecatedResponseResult.success(); + return Result.buildFailure("call get ip rate config failed"); } - return DeprecatedResponseResult.success(JSON.toJSONString( + return Result.buildSuc(JSON.toJSONString( new GatewayConfigVO( String.valueOf(config.getVersion()), String.valueOf(config.getIpRateLimit()) @@ -150,15 +137,11 @@ public class GatewayServiceDiscoveryController { @ApiOperation(value = "最大SP请求速率", notes = "") @RequestMapping(value = "discovery/sp-limit", method = RequestMethod.GET) @ResponseBody - public DeprecatedResponseResult getSpLimit(@RequestParam("versionNumber") long versionNumber) { + public Result getSpLimit(@RequestParam("versionNumber") long versionNumber) { SpRateConfig config = gatewayConfigService.getSpRateConfig(versionNumber); if (ValidateUtils.isNull(config) || ValidateUtils.isNull(config.getSpRateMap())) { - return DeprecatedResponseResult.failure("call update kafka bootstrap servers failed"); - } - - if (config.getSpRateMap().isEmpty()) { - return DeprecatedResponseResult.success(); + return Result.buildFailure("call update kafka bootstrap servers failed"); } List strList = new ArrayList<>(); @@ -166,7 +149,7 @@ public class GatewayServiceDiscoveryController { strList.add(entry.getKey() + "#" + String.valueOf(entry.getValue())); } - return DeprecatedResponseResult.success(JSON.toJSONString(new GatewayConfigVO( + return Result.buildSuc(JSON.toJSONString(new GatewayConfigVO( String.valueOf(config.getVersion()), ListUtils.strList2String(strList) ))); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartAppController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartAppController.java new file mode 100644 index 0000000000000000000000000000000000000000..c9969438591597ad1c6aa5bccc47520ad32241a7 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartAppController.java @@ -0,0 +1,49 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; + +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.common.constant.SystemCodeConstant; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.app.AppVO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import com.xiaojukeji.kafka.manager.web.converters.AppConverter; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author zengqiao + * @date 20/9/23 + */ +@Api(tags = "开放接口-App相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) +public class ThirdPartAppController { + private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartAppController.class); + + @Autowired + private AppService appService; + + @ApiOperation(value = "查询负责的应用", notes = "") + @RequestMapping(value = "principal-apps/{principal}/basic-info", method = RequestMethod.GET) + @ResponseBody + public Result> searchPrincipalApps(@PathVariable("principal") String principal, + @RequestParam("system-code") String systemCode) { + LOGGER.info("search principal-apps, principal:{} systemCode:{}.", principal, systemCode); + if (ValidateUtils.isBlank(principal) || ValidateUtils.isBlank(systemCode)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + if (!SystemCodeConstant.KAFKA_MANAGER.equals(systemCode)) { + return Result.buildFrom(ResultStatus.OPERATION_FORBIDDEN); + } + return new Result<>(AppConverter.convert2AppVOList( + appService.getByPrincipal(principal) + )); + } +} \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java new file mode 100644 index 0000000000000000000000000000000000000000..e324be016e6ac67ca32064aef2975e695d4dcb7f --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartBrokerController.java @@ -0,0 +1,109 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; + +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO; +import com.xiaojukeji.kafka.manager.openapi.common.vo.ThirdPartBrokerOverviewVO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; +import com.xiaojukeji.kafka.manager.openapi.common.vo.BrokerRegionVO; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.BrokerService; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.service.service.RegionService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + + +/** + * @author zengqiao + * @date 20/9/9 + */ +@Api(tags = "开放接口-Broker相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) +public class ThirdPartBrokerController { + @Autowired + private BrokerService brokerService; + + @Autowired + private RegionService regionService; + + @Autowired + private ClusterService clusterService; + + @ApiOperation(value = "Broker信息概览", notes = "") + @RequestMapping(value = "{clusterId}/brokers/{brokerId}/overview", method = RequestMethod.GET) + @ResponseBody + public Result getBrokerOverview(@PathVariable Long clusterId, + @PathVariable Integer brokerId) { + BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId); + if (ValidateUtils.isNull(brokerMetadata)) { + return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST); + } + + BrokerMetrics brokerMetrics = brokerService.getBrokerMetricsFromJmx( + clusterId, + brokerId, + KafkaMetricsCollections.BROKER_STATUS_PAGE_METRICS + ); + if (ValidateUtils.isNull(brokerMetrics)) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + Integer underReplicated = brokerMetrics.getSpecifiedMetrics("UnderReplicatedPartitionsValue", Integer.class); + if (ValidateUtils.isNull(underReplicated)) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + + return new Result<>(new ThirdPartBrokerOverviewVO(clusterId, brokerId, underReplicated.equals(0))); + } + + @ApiOperation(value = "BrokerRegion信息", notes = "所有集群的") + @RequestMapping(value = "broker-regions", method = RequestMethod.GET) + @ResponseBody + public Result> getBrokerRegions() { + List clusterDOList = clusterService.list(); + if (ValidateUtils.isNull(clusterDOList)) { + clusterDOList = new ArrayList<>(); + } + + List regionDOList = regionService.listAll(); + if (ValidateUtils.isNull(regionDOList)) { + regionDOList = new ArrayList<>(); + } + + List voList = new ArrayList<>(); + for (ClusterDO clusterDO: clusterDOList) { + Map brokerIdRegionMap = regionService.convert2BrokerIdRegionMap( + regionDOList.stream().filter(elem -> clusterDO.getId().equals(elem.getClusterId())).collect(Collectors.toList()) + ); + for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterDO.getId())) { + BrokerRegionVO vo = new BrokerRegionVO(); + vo.setClusterId(clusterDO.getId()); + vo.setBrokerId(brokerId); + + BrokerMetadata metadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterDO.getId(), brokerId); + if (!ValidateUtils.isNull(metadata)) { + vo.setHostname(metadata.getHost()); + } + RegionDO regionDO = brokerIdRegionMap.get(brokerId); + if (!ValidateUtils.isNull(regionDO)) { + vo.setRegionName(regionDO.getName()); + } + voList.add(vo); + } + } + return new Result<>(voList); + } +} \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartConsumeController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartConsumeController.java new file mode 100644 index 0000000000000000000000000000000000000000..e14ed81d79ab0841457b3df879aec3dab8c832bc --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartConsumeController.java @@ -0,0 +1,171 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; + +import com.xiaojukeji.kafka.manager.common.bizenum.ConsumeHealthEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OffsetLocationEnum; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.common.constant.Constant; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupDTO; +import com.xiaojukeji.kafka.manager.openapi.common.dto.ConsumeHealthDTO; +import com.xiaojukeji.kafka.manager.openapi.common.dto.OffsetResetDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupDetailVO; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.openapi.ThirdPartService; +import com.xiaojukeji.kafka.manager.openapi.common.vo.ConsumeHealthVO; +import com.xiaojukeji.kafka.manager.service.service.ClusterService; +import com.xiaojukeji.kafka.manager.service.service.ConsumerService; +import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; +import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author zengqiao + * @date 20/10/12 + */ +@Api(tags = "开放接口-Consumer相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) +public class ThirdPartConsumeController { + private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartConsumeController.class); + + @Autowired + private AppService appService; + + @Autowired + private ClusterService clusterService; + + @Autowired + private ConsumerService consumerService; + + @Autowired + private AuthorityService authorityService; + + @Autowired + private ThirdPartService thirdPartService; + + @ApiOperation(value = "消费组健康", notes = "消费组是否健康") + @RequestMapping(value = "clusters/consumer-health", method = RequestMethod.POST) + @ResponseBody + public Result checkConsumeHealth(@RequestBody ConsumeHealthDTO dto) { + LOGGER.info(""); + if (ValidateUtils.isNull(dto) || !dto.paramLegal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + + Result subEnumResult = null; + for (String topicName: dto.getTopicNameList()) { + subEnumResult = thirdPartService.checkConsumeHealth( + dto.getClusterId(), + topicName, + dto.getConsumerGroup(), + dto.getMaxDelayTime() + ); + if (!Constant.SUCCESS.equals(subEnumResult.getCode())) { + return new Result<>(subEnumResult.getCode(), subEnumResult.getMessage()); + } + } + if (ValidateUtils.isNull(subEnumResult)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + return new Result<>(new ConsumeHealthVO(subEnumResult.getData().getCode())); + } + + @ApiOperation(value = "重置消费组", notes = "") + @RequestMapping(value = "consumers/offsets", method = RequestMethod.PUT) + @ResponseBody + public Result> resetOffsets(@RequestBody OffsetResetDTO dto) { + LOGGER.info("rest offset, req:{}.", dto); + if (ValidateUtils.isNull(dto) || !dto.legal()) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + + ClusterDO clusterDO = clusterService.getById(dto.getClusterId()); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + // 检查AppID权限 + if (!appService.verifyAppIdByPassword(dto.getAppId(), dto.getPassword())) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + // 检查权限 + AuthorityDO authority = + authorityService.getAuthority(dto.getClusterId(), dto.getTopicName(), dto.getAppId()); + if (ValidateUtils.isNull(authority) || (authority.getAccess() & 1) <= 0) { + authority = authorityService.getAuthority(dto.getClusterId(), "*", dto.getAppId()); + } + if (authority == null || (authority.getAccess() & 1) <= 0) { + return Result.buildFrom(ResultStatus.USER_WITHOUT_AUTHORITY); + } + + List resultList = thirdPartService.resetOffsets(clusterDO, dto); + if (ValidateUtils.isNull(resultList)) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + for (Result result: resultList) { + if (!Constant.SUCCESS.equals(result.getCode())) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + } + return new Result<>(resultList); + } + + @ApiOperation(value = "查询消费组的消费详情", notes = "") + @RequestMapping(value = "{physicalClusterId}/consumers/{consumerGroup}/topics/{topicName}/consume-details", + method = RequestMethod.GET) + @ResponseBody + public Result> getConsumeDetail(@PathVariable Long physicalClusterId, + @PathVariable String consumerGroup, + @PathVariable String topicName, + @RequestParam("location") String location) { + if (ValidateUtils.isNull(location)) { + return Result.buildFrom(ResultStatus.PARAM_ILLEGAL); + } + + ClusterDO clusterDO = clusterService.getById(physicalClusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + location = location.toLowerCase(); + OffsetLocationEnum offsetStoreLocation = OffsetLocationEnum.getOffsetStoreLocation(location); + if (ValidateUtils.isNull(offsetStoreLocation)) { + return Result.buildFrom(ResultStatus.CG_LOCATION_ILLEGAL); + } + + ConsumerGroupDTO consumeGroupDTO = new ConsumerGroupDTO( + clusterDO.getId(), + consumerGroup, + new ArrayList<>(), + offsetStoreLocation + ); + try { + List consumeDetailDTOList = + consumerService.getConsumeDetail(clusterDO, topicName, consumeGroupDTO); + return new Result<>( + ConsumerModelConverter.convert2ConsumerGroupDetailVO( + topicName, + consumerGroup, + location, + consumeDetailDTOList + ) + ); + } catch (Exception e) { + LOGGER.error("get consume detail failed, consumerGroup:{}.", consumeGroupDTO, e); + } + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } +} \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java new file mode 100644 index 0000000000000000000000000000000000000000..4d029fb6b8004a93b40e9e01cec86e1f835fc853 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartTopicController.java @@ -0,0 +1,160 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; + +import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; +import com.xiaojukeji.kafka.manager.common.constant.Constant; +import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; +import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.consumer.ConsumerGroupVO; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorizedAppVO; +import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata; +import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO; +import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicStatisticMetricsVO; +import com.xiaojukeji.kafka.manager.common.utils.DateUtils; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.*; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter; +import com.xiaojukeji.kafka.manager.web.converters.ConsumerModelConverter; +import com.xiaojukeji.kafka.manager.web.converters.TopicModelConverter; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import java.util.Date; +import java.util.List; + +/** + * @author zengqiao + * @date 20/7/24 + */ +@Api(tags = "开放接口-Topic相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) +public class ThirdPartTopicController { + private final static Logger LOGGER = LoggerFactory.getLogger(ThirdPartTopicController.class); + + @Autowired + private TopicService topicService; + + @Autowired + private ClusterService clusterService; + + @Autowired + private ConsumerService consumerService; + + @Autowired + private TopicManagerService topicManagerService; + + @ApiOperation(value = "Topic元信息", notes = "LogX调用") + @RequestMapping(value = "clusters/{clusterId}/topics/{topicName}/metadata", method = RequestMethod.GET) + @ResponseBody + public Result getTopicMetadata(@PathVariable Long clusterId, @PathVariable String topicName) { + TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName); + if (ValidateUtils.isNull(topicMetadata)) { + return Result.buildFrom(ResultStatus.TOPIC_NOT_EXIST); + } + TopicMetadataVO vo = new TopicMetadataVO(); + vo.setTopicName(topicMetadata.getTopic()); + vo.setPartitionNum(topicMetadata.getPartitionNum()); + return new Result<>(vo); + } + + @ApiOperation(value = "Topic流量统计信息", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET) + @ResponseBody + public Result getTopicStatisticMetrics(@PathVariable Long physicalClusterId, + @PathVariable String topicName, + @RequestParam("latest-day") Integer latestDay) { + try { + return new Result<>(new TopicStatisticMetricsVO(topicManagerService.getTopicMaxAvgBytesIn( + physicalClusterId, + topicName, + new Date(DateUtils.getDayStarTime(-1 * latestDay)), + new Date(), + 1 + ))); + } catch (Exception e) { + LOGGER.error("get topic statistic metrics failed, clusterId:{} topicName:{} latestDay:{}." + , physicalClusterId, topicName, latestDay, e); + } + return Result.buildFrom(ResultStatus.MYSQL_ERROR); + } + + @ApiOperation(value = "Topic是否有流量", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/offset-changed", method = RequestMethod.GET) + @ResponseBody + public Result checkTopicExpired(@PathVariable Long physicalClusterId, + @PathVariable String topicName, + @RequestParam("latest-time") Long latestTime) { + Result enumResult = + topicService.checkTopicOffsetChanged(physicalClusterId, topicName, latestTime); + if (!Constant.SUCCESS.equals(enumResult.getCode())) { + return new Result<>(enumResult.getCode(), enumResult.getMessage()); + } + return new Result<>(new TopicOffsetChangedVO(enumResult.getData().getCode())); + } + + @ApiOperation(value = "Topic实时流量信息", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/metrics", method = RequestMethod.GET) + @ResponseBody + public Result getTopicMetrics(@PathVariable Long physicalClusterId, + @PathVariable String topicName) { + return new Result<>(CommonModelConverter.convert2RealTimeMetricsVO( + topicService.getTopicMetricsFromJMX( + physicalClusterId, + topicName, + KafkaMetricsCollections.COMMON_DETAIL_METRICS, + true + ) + )); + } + + @ApiOperation(value = "Topic实时请求耗时信息", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/request-time", method = RequestMethod.GET) + @ResponseBody + public Result> getTopicRequestMetrics(@PathVariable Long physicalClusterId, + @PathVariable String topicName) { + BaseMetrics metrics = topicService.getTopicMetricsFromJMX( + physicalClusterId, + topicName, + KafkaMetricsCollections.TOPIC_REQUEST_TIME_DETAIL_PAGE_METRICS, + false + ); + return new Result<>(TopicModelConverter.convert2TopicRequestTimeDetailVOList(metrics)); + } + + @ApiOperation(value = "查询Topic的消费组列表", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/consumer-groups", method = RequestMethod.GET) + @ResponseBody + public Result> getConsumeDetail(@PathVariable Long physicalClusterId, + @PathVariable String topicName) { + ClusterDO clusterDO = clusterService.getById(physicalClusterId); + if (ValidateUtils.isNull(clusterDO)) { + return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST); + } + + return new Result<>(ConsumerModelConverter.convert2ConsumerGroupVOList( + consumerService.getConsumerGroupList(physicalClusterId, topicName) + )); + } + + @ApiOperation(value = "Topic应用信息", notes = "") + @RequestMapping(value = "{physicalClusterId}/topics/{topicName}/apps", method = RequestMethod.GET) + @ResponseBody + public Result> getTopicAppIds(@PathVariable Long physicalClusterId, + @PathVariable String topicName) { + return new Result<>(TopicModelConverter.convert2TopicAuthorizedAppVOList( + topicManagerService.getTopicAuthorizedApps(physicalClusterId, topicName)) + ); + } +} \ No newline at end of file diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/WebMvcConfig.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/WebMvcConfig.java index f8aa607a4e5377ec33ff0ad4ade8d2c5144f764c..9f2aedf63d07f5cae275c108d2adbac26100da70 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/WebMvcConfig.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/config/WebMvcConfig.java @@ -1,5 +1,6 @@ package com.xiaojukeji.kafka.manager.web.config; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.web.inteceptor.PermissionInterceptor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringBootConfiguration; @@ -31,7 +32,7 @@ public class WebMvcConfig implements WebMvcConfigurer { @Override public void addInterceptors(InterceptorRegistry registry) { - registry.addInterceptor(permissionInterceptor).addPathPatterns("/api/v1/**"); + registry.addInterceptor(permissionInterceptor).addPathPatterns(ApiPrefix.API_PREFIX + "**"); } @Override diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java index 0cf83b663f768ce0a1d545f9cce525d2c1b839ca..bf8bc1e13171003bbae0feddd110705ee879a16c 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/inteceptor/WebMetricsInterceptor.java @@ -3,7 +3,7 @@ package com.xiaojukeji.kafka.manager.web.inteceptor; import com.codahale.metrics.Timer; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; -import com.xiaojukeji.kafka.manager.common.entity.DeprecatedResponseResult; +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.api.ApiCount; @@ -118,8 +118,8 @@ public class WebMetricsInterceptor { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); String uri = attributes.getRequest().getRequestURI(); - if (uri.contains("gateway/api/v1")) { - return DeprecatedResponseResult.failure("api limited"); + if (uri.contains(ApiPrefix.GATEWAY_API_V1_PREFIX)) { + return Result.buildFailure("api limited"); } return new Result<>(ResultStatus.OPERATION_FORBIDDEN); } diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 580388378c74648bf625972f807e2d4e322c66c6..dbfe215a2d47025f636fc547ae0f13fade9f7a7a 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -30,8 +30,13 @@ logging: custom: idc: cn + jmx: + max-conn: 10 -agent: +account: + ldap: + +kcm: n9e: base-url: http://127.0.0.1/api username: admin @@ -42,11 +47,13 @@ agent: monitor: n9e: base-url: http://127.0.0.1/api + username: admin + user-token: admin + nid: 10 notify: + kafka: + cluster-id: 95 + topic-name: didi-kafka-notify order: detail-url: http://127.0.0.1 - -kafka: - cluster-id: 12 - topic-name: 123 \ No newline at end of file