diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicConnectionDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicConnectionDTO.java deleted file mode 100644 index 8e2703386f15dedd740ffa156c81560d3a20685d..0000000000000000000000000000000000000000 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicConnectionDTO.java +++ /dev/null @@ -1,127 +0,0 @@ -package com.xiaojukeji.kafka.manager.common.entity.dto.gateway; - -import java.util.Date; - -/** - * @author zengqiao - * @date 20/7/6 - */ -public class TopicConnectionDTO { - private Long id; - - private Long clusterId; - - private String topicName; - - // producer or consumer - private String type; - - // appId#ip#clientVersion - private String clientInfo; - - private String appId; - - private String ip; - - private String clientVersion; - - private Date gmtCreate; - - private Date gmtModify; - - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public Long getClusterId() { - return clusterId; - } - - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getClientInfo() { - return clientInfo; - } - - public void setClientInfo(String clientInfo) { - this.clientInfo = clientInfo; - } - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getClientVersion() { - return clientVersion; - } - - public void setClientVersion(String clientVersion) { - this.clientVersion = clientVersion; - } - - public Date getGmtCreate() { - return gmtCreate; - } - - public void setGmtCreate(Date gmtCreate) { - this.gmtCreate = gmtCreate; - } - - public Date getGmtModify() { - return gmtModify; - } - - public void setGmtModify(Date gmtModify) { - this.gmtModify = gmtModify; - } - - @Override - public String toString() { - return "TopicConnectionDTO{" + - "id=" + id + - ", clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", type='" + type + '\'' + - ", clientInfo='" + clientInfo + '\'' + - ", appId='" + appId + '\'' + - ", ip='" + ip + '\'' + - ", clientVersion='" + clientVersion + '\'' + - ", gmtCreate=" + gmtCreate + - ", gmtModify=" + gmtModify + - '}'; - } -} \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java index 5daaece0d50e4e953f26d0e8b4b99f330b1fcb3c..a8c4997ed953e71eafd44d6a622e61a920248457 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java @@ -1,8 +1,11 @@ package com.xiaojukeji.kafka.manager.common.utils; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializeConfig; import com.fasterxml.jackson.databind.ObjectMapper; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import java.lang.reflect.Method; import java.util.ArrayList; @@ -48,4 +51,32 @@ public class JsonUtils { public static String toJSONString(Object obj) { return JSON.toJSONString(obj); } + + public static List parseTopicConnections(Long clusterId, JSONObject jsonObject) { + List connectionDOList = new ArrayList<>(); + for (String clientType: jsonObject.keySet()) { + JSONObject topicObject = jsonObject.getJSONObject(clientType); + + // 解析单个Topic的连接信息 + for (String topicName: topicObject.keySet()) { + JSONArray appIdArray = topicObject.getJSONArray(topicName); + for (Object appIdDetail : appIdArray.toArray()) { + TopicConnectionDO connectionDO = new TopicConnectionDO(); + + String[] appIdDetailArray = appIdDetail.toString().split("#"); + if (appIdDetailArray.length == 3) { + connectionDO.setAppId(appIdDetailArray[0]); + connectionDO.setIp(appIdDetailArray[1]); + connectionDO.setClientVersion(appIdDetailArray[2]); + } + + connectionDO.setClusterId(clusterId); + connectionDO.setTopicName(topicName); + connectionDO.setType(clientType); + connectionDOList.add(connectionDO); + } + } + } + return connectionDOList; + } } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java index a37b95e275fc794c7c73f625fb46e1b3ef23b91a..f1b7d1fa2fa8eaca4ddb94008aa7955b4199ccc2 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java @@ -311,6 +311,19 @@ public class PhysicalClusterMetadataManager { return metadataMap.get(brokerId); } + public static BrokerMetadata getBrokerMetadata(Long clusterId, String hostname) { + Map metadataMap = BROKER_METADATA_MAP.get(clusterId); + if (metadataMap == null) { + return null; + } + for (BrokerMetadata brokerMetadata: metadataMap.values()) { + if (brokerMetadata.getHost().equals(hostname)) { + return brokerMetadata; + } + } + return null; + } + public static Map> getBrokerHostKafkaRoleMap(Long clusterId) { Map> hostRoleMap = new HashMap<>(); ControllerData controllerData = CONTROLLER_DATA_MAP.get(clusterId); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java index 0d47ef85b7ec8eedc257ecaef171850efe33f917..795446089c4052119b9290212c3ff61e80cebe40 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java @@ -1,7 +1,7 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; -import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import java.util.Date; import java.util.List; @@ -11,7 +11,7 @@ import java.util.List; * @date 20/4/13 */ public interface TopicConnectionService { - int batchAdd(List dtoList); + int batchAdd(List doList); /** * 查询连接信息 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index 3f0f0b0b304be5e1e329117b7ffee82bef23922c..b29db05c0f911ad98a0ba424c95adbffd51cdbf0 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; -import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.gateway.TopicConnectionDao; @@ -28,23 +27,16 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { private TopicConnectionDao topicConnectionDao; @Override - public int batchAdd(List dtoList) { - if (ValidateUtils.isEmptyList(dtoList)) { + public int batchAdd(List doList) { + if (ValidateUtils.isEmptyList(doList)) { return 0; } int count = 0; - for (TopicConnectionDTO dto: dtoList) { + for (TopicConnectionDO connectionDO: doList) { try { - TopicConnectionDO topicConnectionDO = new TopicConnectionDO(); - topicConnectionDO.setClusterId(dto.getClusterId()); - topicConnectionDO.setTopicName(dto.getTopicName()); - topicConnectionDO.setType(dto.getType()); - topicConnectionDO.setAppId(dto.getAppId()); - topicConnectionDO.setIp(dto.getIp()); - topicConnectionDO.setClientVersion(dto.getClientVersion()); - count += topicConnectionDao.replace(topicConnectionDO); + count += topicConnectionDao.replace(connectionDO); } catch (Exception e) { - LOGGER.error("replace topic connections failed, data:{}.", dto); + LOGGER.error("replace topic connections failed, data:{}.", connectionDO); } } return count; diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java index e71175f7a74d338c5adfe3bf12901299da9e8ac4..90e347e01d6197cf6b94126952dec8b2dc894865 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java @@ -1,6 +1,8 @@ package com.xiaojukeji.kafka.manager.kcm.component.agent.n9e; import com.alibaba.fastjson.JSON; +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskTypeEnum; import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData; import com.xiaojukeji.kafka.manager.common.utils.HttpUtils; import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; @@ -18,6 +20,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -31,19 +38,21 @@ public class N9e extends AbstractAgent { private static final Logger LOGGER = LoggerFactory.getLogger(N9e.class); @Value("${kcm.n9e.base-url}") - private String baseUrl; - - @Value("${kcm.n9e.username}") - private String username; + private String baseUrl; @Value("${kcm.n9e.user-token}") - private String userToken; + private String userToken; - @Value("${kcm.n9e.tpl-id}") - private Integer tplId; + @Value("${kcm.n9e.account}") + private String account; @Value("${kcm.n9e.timeout}") - private Integer timeout; + private Integer timeout; + + @Value("${kcm.n9e.script-file}") + private String scriptFile; + + private String script; /** * 并发度,顺序执行 @@ -67,21 +76,14 @@ public class N9e extends AbstractAgent { private static final String TASK_STD_LOG_URI = "/api/job-ce/task/{taskId}/stdout.json"; + @PostConstruct + public void init() { + this.script = readScriptInJarFile(scriptFile); + } + @Override - public Long createTask(CreationTaskData dto) { - StringBuilder sb = new StringBuilder(); - sb.append(dto.getKafkaPackageName()).append(",,").append(dto.getKafkaPackageMd5()).append(",,"); - sb.append(dto.getServerPropertiesName()).append(",,").append(dto.getServerPropertiesMd5()); - - Map param = new HashMap<>(); - param.put("tpl_id", tplId); - param.put("batch", BATCH); - param.put("tolerance", TOLERANCE); - param.put("timeout", timeout); - param.put("hosts", dto.getHostList()); - param.put("pause", ListUtils.strList2String(dto.getPauseList())); - param.put("action", "pause"); - param.put("args", sb.toString()); + public Long createTask(CreationTaskData creationTaskData) { + Map param = buildCreateTaskParam(creationTaskData); String response = null; try { @@ -96,7 +98,7 @@ public class N9e extends AbstractAgent { } return Long.valueOf(zr.getDat().toString()); } catch (Exception e) { - LOGGER.error("create task failed, dto:{}.", dto, e); + LOGGER.error("create task failed, req:{}.", creationTaskData, e); } return null; } @@ -126,7 +128,7 @@ public class N9e extends AbstractAgent { @Override public Boolean actionHostTask(Long taskId, String action, String hostname) { - Map param = new HashMap<>(3); + Map param = new HashMap<>(2); param.put("action", action); param.put("hostname", hostname); @@ -143,7 +145,7 @@ public class N9e extends AbstractAgent { } return false; } catch (Exception e) { - LOGGER.error("action task failed, taskId:{}, action:{}, hostname:{}.", taskId, action, hostname, e); + LOGGER.error("action task failed, taskId:{} action:{} hostname:{}.", taskId, action, hostname, e); } return false; } @@ -186,7 +188,7 @@ public class N9e extends AbstractAgent { JSON.parseObject(JSON.toJSONString(n9eResult.getDat()), N9eTaskResultDTO.class); return n9eTaskResultDTO.convert2HostnameStatusMap(); } catch (Exception e) { - LOGGER.error("get task status failed, agentTaskId:{}.", agentTaskId, e); + LOGGER.error("get task result failed, agentTaskId:{} response:{}.", agentTaskId, response, e); } return null; } @@ -217,9 +219,63 @@ public class N9e extends AbstractAgent { } private Map buildHeader() { - Map headers = new HashMap<>(1); + Map headers = new HashMap<>(2); headers.put("Content-Type", "application/json;charset=UTF-8"); headers.put("X-User-Token", userToken); return headers; } + + private Map buildCreateTaskParam(CreationTaskData creationTaskData) { + StringBuilder sb = new StringBuilder(); + sb.append(creationTaskData.getUuid()).append(",,"); + sb.append(creationTaskData.getClusterId()).append(",,"); + sb.append(ClusterTaskTypeEnum.getByName(creationTaskData.getTaskType()).getWay()).append(",,"); + sb.append(creationTaskData.getKafkaPackageName().replace(KafkaFileEnum.PACKAGE.getSuffix(), "")).append(",,"); + sb.append(creationTaskData.getKafkaPackageMd5()).append(",,"); + sb.append(creationTaskData.getKafkaPackageUrl()).append(",,"); + sb.append(creationTaskData.getServerPropertiesName().replace(KafkaFileEnum.SERVER_CONFIG.getSuffix(), "")).append(",,"); + sb.append(creationTaskData.getServerPropertiesMd5()).append(",,"); + sb.append(creationTaskData.getServerPropertiesUrl()); + + Map params = new HashMap<>(10); + params.put("title", String.format("集群ID=%d-升级部署", creationTaskData.getClusterId())); + params.put("batch", BATCH); + params.put("tolerance", TOLERANCE); + params.put("timeout", timeout); + params.put("pause", ListUtils.strList2String(creationTaskData.getPauseList())); + params.put("script", this.script); + params.put("args", sb.toString()); + params.put("account", account); + params.put("action", "pause"); + params.put("hosts", creationTaskData.getHostList()); + return params; + } + + private static String readScriptInJarFile(String fileName) { + InputStream inputStream = N9e.class.getClassLoader().getResourceAsStream(fileName); + if (inputStream == null) { + LOGGER.error("read kcm script failed, filename:{}", fileName); + return ""; + } + + try { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + String line = null; + StringBuilder stringBuilder = new StringBuilder(""); + + while ((line = bufferedReader.readLine()) != null) { + stringBuilder.append(line); + } + return stringBuilder.toString(); + } catch (IOException e) { + LOGGER.error("read kcm script failed, filename:{}", fileName, e); + return ""; + } finally { + try { + inputStream.close(); + } catch (IOException e) { + LOGGER.error("close reading kcm script failed, filename:{}", fileName, e); + } + } + } } \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java index 51f3828e2e4d15b0a6660de9122e9ff11be6521c..b4a84eb01be59774a681b17eb4561306b15542c4 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java @@ -6,6 +6,7 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.common; * @date 20/4/29 */ public enum StorageEnum { + GIFT(0, "gift"), GIT(1, "git"), S3(2, "S3"), ; diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh new file mode 100644 index 0000000000000000000000000000000000000000..e369bee8fcdbf8cbec337c61d9a839de81ec3df9 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh @@ -0,0 +1,370 @@ +#!/bin/sh +#集群任务脚本 + +set -x # 调试方式执行 + +#----------------------------------------日志格式------------------------------------------------------# +alias ECHO_LOG='echo `date +%F%n%T` hostname:`hostname` Line:${LINENO} ' + +#----------------------------------------参数列表------------------------------------------------------# +p_task_id=${1} #任务ID +p_cluster_id=${2} #集群ID +p_cluster_task_type=${3} #任务类型[0:升级, 1:新部署, 2:回滚] + +p_kafka_package_name=${4} #包名 +p_kafka_package_md5=${5} #包MD5 +p_kafka_package_url=${6} #包下载地址 +p_kafka_server_properties_name=${7} #server配置名 +p_kafka_server_properties_md5=${8} #server配置MD5 +p_kafka_server_properties_url=${9} #server配置文件下载地址 + +#----------------------------------------配置信息------------------------------------------------------# +g_hostname=`hostname` +g_base_dir='/home/km' +g_cluster_task_dir=${g_base_dir}"/kafka_cluster_task/task_${p_task_id}" #部署升级路径 +g_rollback_version=${g_cluster_task_dir}"/rollback_version" #回滚版本 +g_new_kafka_package_name='' #最终的包名 +g_kafka_manager_addr='' #kafka-manager地址 + +#----------------------------------------操作函数------------------------------------------------------# + +# dhcat通知 +function dchat_alarm() { + alarm_msg=$1 + data=' +{ + "text": "'${g_hostname}' 升级失败,请及时处理", + "attachments": [ + { + "title": "'${alarm_msg}'", + "color": "#ffa500" + } + ] +}' +#curl -H 'Content-Type: application/json' -d "${data}" ${dchat_bot} +} + +# 检查并初始化环境 +function check_and_init_env() { + if [ -z "${p_task_id}" -o -z "${p_cluster_task_type}" -o -z "${p_kafka_package_url}" -o -z "${p_cluster_id}" -o -z "${p_kafka_package_name}" -o -z "${p_kafka_package_md5}" -o -z "${p_kafka_server_properties_name}" -o -z "${p_kafka_server_properties_md5}" ]; then + ECHO_LOG "存在为空的参数不合法, 退出集群任务" + dchat_alarm "存在为空的参数不合法, 退出集群任务" + exit 1 + fi + + cd ${g_base_dir} + if [ $? -ne 0 -o ! -x "${g_base_dir}" ];then + ECHO_LOG "${g_base_dir}目录不存在或无权限, 退出集群任务" + dchat_alarm "${g_base_dir}目录不存在或无权限, 退出集群任务" + exit 1 + fi + + ECHO_LOG "初始化集群任务所需的目录" + mkdir -p ${g_cluster_task_dir} + if [ $? -ne 0 ];then + ECHO_LOG "创建集群任务路径失败, 退出集群任务" + dchat_alarm "创建集群任务路径失败, 退出集群任务" + exit 1 + fi +} + + +# 检查并等待集群所有的副本处于同步的状态 +function check_and_wait_broker_stabled() { + under_replication_count=`curl -s -G -d "hostname="#{g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l` + while [ "$under_replication_count" -ne 1 ]; do + ECHO_LOG "存在${under_replication_count}个副本未同步, sleep 10s" + sleep 10 + under_replication_count=`curl -s ${g_kafka_manager_addr}/api/v1/${p_cluster_id}/overview | python -m json.tool | grep false |wc -l` + done + ECHO_LOG "集群副本都已经处于同步的状态, 可以进行集群升级" +} + + +# 拉包并检查其md5 +function pull_and_check_kafka_package() { + ECHO_LOG "开始下载${1}文件" + wget ${1} -P ${g_cluster_task_dir} + if [ $? -ne 0 ];then + ECHO_LOG "下载${1}失败, 退出集群任务" + dchat_alarm "下载${1}失败, 退出集群任务" + exit 1 + fi + + file_md5_sum=`md5sum "${g_cluster_task_dir}/${p_kafka_package_name}.tgz" | awk -F " " '{print $1}'` + if [ "$file_md5_sum" != "${2}" ];then + ECHO_LOG "下载${1}成功, 但是校验md5失败, 退出集群任务" + dchat_alarm "下载${1}成功, 但是校验md5失败, 退出集群任务" + exit 1 + fi +} + +# 拉配置文件并检查其md5 +function pull_and_check_kafka_properties() { + ECHO_LOG "开始下载${1}文件" + wget ${1} -P ${g_cluster_task_dir} + if [ $? -ne 0 ];then + ECHO_LOG "下载${1}失败, 退出集群任务" + dchat_alarm "下载${1}失败, 退出集群任务" + exit 1 + fi + + file_md5_sum=`md5sum "${g_cluster_task_dir}/${p_kafka_server_properties_name}.properties" | awk -F " " '{print $1}'` + if [ "$file_md5_sum" != "${2}" ];then + ECHO_LOG "下载${1}成功, 但是校验md5失败, 退出集群任务" + dchat_alarm "下载${1}成功, 但是校验md5失败, 退出集群任务" + exit 1 + fi +} + +# 准备集群任务的文件 +function prepare_cluster_task_files() { + pull_and_check_kafka_package ${p_kafka_package_url} ${p_kafka_package_md5} + ECHO_LOG "解压并拷贝kafka包文件" + tar -zxf "${g_cluster_task_dir}/${p_kafka_package_name}.tgz" -C "${g_cluster_task_dir}" + if [ $? -ne 0 ];then + ECHO_LOG "解压${p_kafka_package_name}.tgz失败, 退出集群任务" + dchat_alarm "解压${p_kafka_package_name}.tgz失败, 退出集群任务" + exit 1 + fi + + pull_and_check_kafka_properties ${p_kafka_server_properties_url} ${p_kafka_server_properties_md5} + ECHO_LOG "拷贝kafka配置文件" + cp -f "${g_cluster_task_dir}/${p_kafka_server_properties_name}.properties" "${g_cluster_task_dir}/${p_kafka_package_name}/config/server.properties" + if [ $? -ne 0 ];then + ECHO_LOG "拷贝${p_kafka_server_properties_name}.properties失败, 退出集群任务" + dchat_alarm "拷贝${p_kafka_server_properties_name}.properties失败, 退出集群任务" + exit 1 + fi + + # 将MD5信息写到包中 + echo "package_md5:${p_kafka_package_md5} server_properties_md5:${p_kafka_package_md5}" > "${g_cluster_task_dir}/${p_kafka_package_name}/package_and_properties.md5" +} + + +# 停kafka服务 +function stop_kafka_server() { + sh ${g_base_dir}"/kafka/bin/kafka-server-stop.sh" + + ECHO_LOG "检查并等待kafka服务下线" + kafka_pid=`jps | grep 'Kafka' | awk '{print $1}'` + while [ ! -z "${kafka_pid}" ];do + ECHO_LOG "kafka服务未下线, 继续sleep 5s" + sleep 5 + + kafka_pid=`jps | grep 'Kafka' | awk '{print $1}'` + done + ECHO_LOG "kafka服务已停掉" +} + + + +function cal_new_package_name() { + if [ ! -d "${g_base_dir}/${p_kafka_package_name}" ]; then + # 当前使用的包版本未部署过 + g_new_kafka_package_name=${p_kafka_package_name} + return + fi + + deploy_version=1 + while [ ${deploy_version} -le 1000 ];do + if [ ! -d "${g_base_dir}/${p_kafka_package_name}_v${deploy_version}" ]; then + g_new_kafka_package_name="${p_kafka_package_name}_v${deploy_version}" + return + fi + ECHO_LOG "包 ${p_kafka_package_name}_v${deploy_version} 已经存在" + deploy_version=`expr ${deploy_version} + 1` + done +} + + +# +function backup_and_init_new_kafka_server_soft_link() { + cd ${g_base_dir}/"kafka" + if [ $? -ne 0 ];then + ECHO_LOG "kafka软链不存在, 退出集群任务" + dchat_alarm "kafka软链不存在, 退出集群任务" + exit 1 + fi + + # 纪录回滚版本 + kafka_absolute_path=`pwd -P` + rollback_version=`basename "${kafka_absolute_path}"` + echo ${rollback_version} > ${g_rollback_version} + ECHO_LOG "上一版本: ${rollback_version}" + + # 去除软链 + unlink ${g_base_dir}/"kafka" + if [ $? -ne 0 ];then + ECHO_LOG "移除软链失败, 退出集群任务" + dchat_alarm "移除软链失败, 退出集群任务" + exit 1 + fi + + # 计算新的包名及初始化环境 + init_new_kafka_server_soft_link +} + + +# 回滚之前的版本 +function rollback_kafka_server_soft_link() { + if [ ! -f "${g_rollback_version}" ]; then + ECHO_LOG "回滚文件不存在, 退出集群任务" + dchat_alarm "回滚文件不存在, 退出集群任务" + exit 1 + fi + + rollback_version=`cat ${g_rollback_version}` + if [ ! -n "${rollback_version}" ]; then + ECHO_LOG "回滚信息不存在, 退出集群任务" + dchat_alarm "回滚信息不存在, 退出集群任务" + exit 1 + fi + + # 去除软链 + unlink ${g_base_dir}/"kafka" + if [ $? -ne 0 ];then + ECHO_LOG "移除软链失败, 退出集群任务" + dchat_alarm "移除软链失败, 退出集群任务" + exit 1 + fi + + ln -s "${g_base_dir}/${rollback_version}" "${g_base_dir}/kafka" + if [ $? -ne 0 ];then + ECHO_LOG "创建软链失败, 退出集群任务" + dchat_alarm "创建软链失败, 退出集群任务" + exit 1 + fi + ECHO_LOG "修改软链成功" +} + + +function init_new_kafka_server_soft_link() { + if [ -L "${g_base_dir}/kafka" ];then + ECHO_LOG "kafka软链依旧存在, 退出集群任务" + dchat_alarm "kafka软链依旧存在, 退出集群任务" + exit 1 + fi + + # 计算新的包名 + cal_new_package_name + ECHO_LOG "集群任务新包的名字为${g_new_kafka_package_name}" + + # 拷贝新的包 + cp -rf "${g_cluster_task_dir}/${p_kafka_package_name}" "${g_base_dir}/${g_new_kafka_package_name}" + + ln -s "${g_base_dir}/${g_new_kafka_package_name}" "${g_base_dir}/kafka" + if [ $? -ne 0 ];then + ECHO_LOG "创建软链失败, 退出集群任务" + dchat_alarm "创建软链失败, 退出集群任务" + exit 1 + fi + ECHO_LOG "备份并修改软链成功" +} + + +function check_and_wait_kafka_process_started() { + sleep 1 + + # 等待并检查kafka进程是否正常启动 + ECHO_LOG "开始等待并检查进程是否正常启动" + if [ ! -L "${g_base_dir}/kafka" ];then + ECHO_LOG "kafka软链不存在, 退出集群任务" + dchat_alarm "kafka软链不存在, 退出集群任务" + exit 1 + fi + + log_started_count=0 + while [ "$log_started_count" == "0" ];do + # 检查进程是否存活 + kafka_pid=`jps | grep 'Kafka' | awk '{print $1}'` + if [ -z "${kafka_pid}" ];then + ECHO_LOG "安装失败, kafka进程不存在, 退出集群任务" + dchat_alarm "安装失败, kafka进程不存在, 退出集群任务" + exit 1 + fi + + sleep 2 + + #检查是否存在NotLeader的分区 + not_leader_error_count=`grep Exception ${g_base_dir}/kafka/logs/server.log* | grep NotLeaderForPartitionException | wc -l` + if [ ${not_leader_error_count} -gt 0 ];then + ECHO_LOG "安装失败, 存在无Leader的分区, 退出集群任务" + dchat_alarm "安装失败, 存在无Leader的分区, 退出集群任务" + exit 1 + fi + + #判断Started的日志是否正常打印出来了 + log_started_count=`grep Started ${g_base_dir}/kafka/logs/server.log | wc -l` + done + ECHO_LOG "进程已正常启动, 结束进程状态检查" +} + +start_new_kafka_server() { + nohup sh "${g_base_dir}/kafka/bin/kafka-server-start.sh" "${g_base_dir}/kafka/config/server.properties" > /dev/null 2>&1 & + if [ $? -ne 0 ];then + ECHO_LOG "启动kafka服务失败, 退出部署升级" + dchat_alarm "启动kafka服务失败, 退出部署升级" + exit 1 + fi +} + + +#----------------------------------部署流程---------------------------------------------------------# +ECHO_LOG "集群任务启动..." +ECHO_LOG "参数信息: " +ECHO_LOG " p_task_id=${p_task_id}" +ECHO_LOG " p_cluster_id=${p_cluster_id}" +ECHO_LOG " p_cluster_task_type=${p_cluster_task_type}" +ECHO_LOG " p_kafka_package_name=${p_kafka_package_name}" +ECHO_LOG " p_kafka_package_md5=${p_kafka_package_md5}" +ECHO_LOG " p_kafka_server_properties_name=${p_kafka_server_properties_name}" +ECHO_LOG " p_kafka_server_properties_md5=${p_kafka_server_properties_md5}" + + + +if [ "${p_cluster_task_type}" == "0" -o "${p_cluster_task_type}" == "1" ];then + ECHO_LOG "检查并初始化环境" + check_and_init_env + ECHO_LOG "准备集群任务所需的文件" + prepare_cluster_task_files +else + ECHO_LOG "升级回滚, 无需准备新文件" +fi + +#ECHO_LOG "检查并等待Broker处于稳定状态" +#check_and_wait_broker_stabled + +ECHO_LOG "停kafka服务" +stop_kafka_server + +ECHO_LOG "停5秒, 确保" +sleep 5 + +if [ "${p_cluster_task_type}" == "0" ];then + ECHO_LOG "备份并初始化升级所需的环境(包/软链等)" + backup_and_init_new_kafka_server_soft_link +elif [ "${p_cluster_task_type}" == "1" ];then + ECHO_LOG "初始化部署所需的环境(包/软链等)" + init_new_kafka_server_soft_link +else + ECHO_LOG "回滚旧的环境(包/软链等)" + rollback_kafka_server_soft_link +fi + +ECHO_LOG "启动kafka服务" +start_new_kafka_server + + +ECHO_LOG "检查并等待kafka服务正常运行..." +check_and_wait_kafka_process_started + + +ECHO_LOG "检查并等待Broker处于稳定状态" +check_and_wait_broker_stabled + +ECHO_LOG "清理临时文件" +#rm -r ${g_cluster_task_dir} + + +ECHO_LOG "升级成功, 结束升级" \ No newline at end of file diff --git a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java index df2e430c675c8b3269a4581a8cf82940004cc773..2b963c090fddda845b277ef27057f7e1d98fc7f4 100644 --- a/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java +++ b/kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java @@ -72,7 +72,7 @@ public class N9eService extends AbstractMonitorService { /** * 告警组 */ - private static final String ALL_NOTIFY_GROUP_URL = "/api/mon/teams/all"; + private static final String ALL_NOTIFY_GROUP_URL = "/api/rdb/teams/all"; /** * 监控策略的增删改查 diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java index 8a4fe9faef5dd10e122d560d770a591d9f538d43..9edddb282ba2fd20d1d6638b07ce6f53bc7b6973 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java @@ -11,6 +11,7 @@ import com.xiaojukeji.kafka.manager.task.component.EmptyEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import java.util.Arrays; import java.util.List; @@ -20,6 +21,7 @@ import java.util.List; * @date 20/9/7 */ @CustomScheduled(name = "syncClusterTaskState", cron = "0 0/1 * * * ?", threadNum = 1) +@ConditionalOnProperty(prefix = "kcm", name = "enabled", havingValue = "true", matchIfMissing = true) public class SyncClusterTaskState extends AbstractScheduledTask { private final static Logger LOGGER = LoggerFactory.getLogger(LogConstant.SCHEDULED_TASK_LOGGER); diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java index 3008a9ea5678c7e6683f612e8394cfb008ba6dc8..78f5ac4a8854e87de8bf8bb7be36ae6182487bc8 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java @@ -1,10 +1,10 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.gateway; -import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import com.xiaojukeji.kafka.manager.common.annotations.ApiLevel; import com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent; import com.xiaojukeji.kafka.manager.common.entity.Result; -import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; @@ -15,8 +15,6 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; -import java.util.List; - /** * @author zengqiao * @date 20/7/6 @@ -34,18 +32,17 @@ public class GatewayHeartbeatController { @ApiOperation(value = "连接信息上报入口", notes = "Broker主动上报信息") @RequestMapping(value = "heartbeat/survive-user", method = RequestMethod.POST) @ResponseBody - public Result receiveTopicConnections(@RequestParam("clusterId") String clusterId, - @RequestParam("brokerId") String brokerId, - @RequestBody List dtoList) { + public Result receiveTopicConnections(@RequestParam("clusterId") Long clusterId, + @RequestParam("brokerId") Integer brokerId, + @RequestBody JSONObject jsonObject) { try { - if (ValidateUtils.isEmptyList(dtoList)) { + if (ValidateUtils.isNull(jsonObject) || jsonObject.isEmpty()) { return Result.buildSuc(); } - topicConnectionService.batchAdd(dtoList); + topicConnectionService.batchAdd(JsonUtils.parseTopicConnections(clusterId, jsonObject)); return Result.buildSuc(); } catch (Exception e) { - LOGGER.error("receive topic connections failed, clusterId:{} brokerId:{} req:{}", - clusterId, brokerId, JSON.toJSONString(dtoList), e); + LOGGER.error("receive topic connections failed, clusterId:{} brokerId:{} req:{}", clusterId, brokerId, jsonObject, e); } return Result.buildFailure("fail"); } diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartClusterController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartClusterController.java new file mode 100644 index 0000000000000000000000000000000000000000..e379256fc89232264331bb18614e5fac36c32ff0 --- /dev/null +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartClusterController.java @@ -0,0 +1,55 @@ +package com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart; + +import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix; +import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata; +import com.xiaojukeji.kafka.manager.openapi.common.vo.ThirdPartBrokerOverviewVO; +import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.BrokerService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * @author zengqiao + * @date 20/11/9 + */ +@Api(tags = "开放接口-Cluster相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V1_THIRD_PART_PREFIX) +public class ThirdPartClusterController { + + @Autowired + private BrokerService brokerService; + + @ApiOperation(value = "Broker信息概览", notes = "") + @RequestMapping(value = "{clusterId}/broker-stabled", method = RequestMethod.GET) + @ResponseBody + public Result checkBrokerStabled(@PathVariable Long clusterId, + @RequestParam("hostname") String hostname) { + BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, hostname); + if (ValidateUtils.isNull(brokerMetadata)) { + return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST); + } + + BrokerMetrics brokerMetrics = brokerService.getBrokerMetricsFromJmx( + clusterId, + brokerMetadata.getBrokerId(), + KafkaMetricsCollections.BROKER_STATUS_PAGE_METRICS + ); + if (ValidateUtils.isNull(brokerMetrics)) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + Integer underReplicated = brokerMetrics.getSpecifiedMetrics("UnderReplicatedPartitionsValue", Integer.class); + if (ValidateUtils.isNull(underReplicated)) { + return Result.buildFrom(ResultStatus.OPERATION_FAILED); + } + + return new Result<>(underReplicated.equals(0)); + } +} \ No newline at end of file diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml index 54f28a256597e1ab978dadc0920cbbcb1e6da184..28bc976299e03aadcc2d22e43691401aa3d29c9e 100644 --- a/kafka-manager-web/src/main/resources/application.yml +++ b/kafka-manager-web/src/main/resources/application.yml @@ -37,12 +37,13 @@ account: ldap: kcm: + enabled: false n9e: - base-url: http://127.0.0.1/api - username: admin - user-token: admin - tpl-id: 123456 - timeout: 30 + base-url: http://127.0.0.1:8080 + user-token: 12345678 + timeout: 300 + account: km + script-file: kcm_script.sh monitor: enabled: false