Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
DiDi
kafka-manager
提交
f84d2501
K
kafka-manager
项目概览
DiDi
/
kafka-manager
大约 1 年 前同步成功
通知
60
Star
6372
Fork
1229
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
K
kafka-manager
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f84d2501
编写于
11月 15, 2020
作者:
Z
zengqiao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
kcm修复&连接信息接口修复
上级
3ffb4b89
变更
13
显示空白变更内容
内联
并排
Showing
13 changed file
with
577 addition
and
186 deletion
+577
-186
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicConnectionDTO.java
...manager/common/entity/dto/gateway/TopicConnectionDTO.java
+0
-127
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
.../com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
+31
-0
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
...manager/service/cache/PhysicalClusterMetadataManager.java
+13
-0
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java
...nager/service/service/gateway/TopicConnectionService.java
+2
-2
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java
...vice/service/gateway/impl/TopicConnectionServiceImpl.java
+5
-13
kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
...xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
+83
-27
kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java
...fka/manager/kcm/component/storage/common/StorageEnum.java
+1
-0
kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh
...xtends/kafka-manager-kcm/src/main/resources/kcm_script.sh
+370
-0
kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java
...ukeji/kafka/manager/monitor/component/n9e/N9eService.java
+1
-1
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
...kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
+2
-0
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java
...eb/api/versionone/gateway/GatewayHeartbeatController.java
+8
-11
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartClusterController.java
.../api/versionone/thirdpart/ThirdPartClusterController.java
+55
-0
kafka-manager-web/src/main/resources/application.yml
kafka-manager-web/src/main/resources/application.yml
+6
-5
未找到文件。
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/gateway/TopicConnectionDTO.java
已删除
100644 → 0
浏览文件 @
3ffb4b89
package
com.xiaojukeji.kafka.manager.common.entity.dto.gateway
;
import
java.util.Date
;
/**
* @author zengqiao
* @date 20/7/6
*/
public
class
TopicConnectionDTO
{
private
Long
id
;
private
Long
clusterId
;
private
String
topicName
;
// producer or consumer
private
String
type
;
// appId#ip#clientVersion
private
String
clientInfo
;
private
String
appId
;
private
String
ip
;
private
String
clientVersion
;
private
Date
gmtCreate
;
private
Date
gmtModify
;
public
Long
getId
()
{
return
id
;
}
public
void
setId
(
Long
id
)
{
this
.
id
=
id
;
}
public
Long
getClusterId
()
{
return
clusterId
;
}
public
void
setClusterId
(
Long
clusterId
)
{
this
.
clusterId
=
clusterId
;
}
public
String
getTopicName
()
{
return
topicName
;
}
public
void
setTopicName
(
String
topicName
)
{
this
.
topicName
=
topicName
;
}
public
String
getType
()
{
return
type
;
}
public
void
setType
(
String
type
)
{
this
.
type
=
type
;
}
public
String
getClientInfo
()
{
return
clientInfo
;
}
public
void
setClientInfo
(
String
clientInfo
)
{
this
.
clientInfo
=
clientInfo
;
}
public
String
getAppId
()
{
return
appId
;
}
public
void
setAppId
(
String
appId
)
{
this
.
appId
=
appId
;
}
public
String
getIp
()
{
return
ip
;
}
public
void
setIp
(
String
ip
)
{
this
.
ip
=
ip
;
}
public
String
getClientVersion
()
{
return
clientVersion
;
}
public
void
setClientVersion
(
String
clientVersion
)
{
this
.
clientVersion
=
clientVersion
;
}
public
Date
getGmtCreate
()
{
return
gmtCreate
;
}
public
void
setGmtCreate
(
Date
gmtCreate
)
{
this
.
gmtCreate
=
gmtCreate
;
}
public
Date
getGmtModify
()
{
return
gmtModify
;
}
public
void
setGmtModify
(
Date
gmtModify
)
{
this
.
gmtModify
=
gmtModify
;
}
@Override
public
String
toString
()
{
return
"TopicConnectionDTO{"
+
"id="
+
id
+
", clusterId="
+
clusterId
+
", topicName='"
+
topicName
+
'\''
+
", type='"
+
type
+
'\''
+
", clientInfo='"
+
clientInfo
+
'\''
+
", appId='"
+
appId
+
'\''
+
", ip='"
+
ip
+
'\''
+
", clientVersion='"
+
clientVersion
+
'\''
+
", gmtCreate="
+
gmtCreate
+
", gmtModify="
+
gmtModify
+
'}'
;
}
}
\ No newline at end of file
kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java
浏览文件 @
f84d2501
package
com.xiaojukeji.kafka.manager.common.utils
;
package
com.xiaojukeji.kafka.manager.common.utils
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONArray
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.serializer.SerializeConfig
;
import
com.alibaba.fastjson.serializer.SerializeConfig
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.fasterxml.jackson.databind.ObjectMapper
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO
;
import
java.lang.reflect.Method
;
import
java.lang.reflect.Method
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
...
@@ -48,4 +51,32 @@ public class JsonUtils {
...
@@ -48,4 +51,32 @@ public class JsonUtils {
public
static
String
toJSONString
(
Object
obj
)
{
public
static
String
toJSONString
(
Object
obj
)
{
return
JSON
.
toJSONString
(
obj
);
return
JSON
.
toJSONString
(
obj
);
}
}
public
static
List
<
TopicConnectionDO
>
parseTopicConnections
(
Long
clusterId
,
JSONObject
jsonObject
)
{
List
<
TopicConnectionDO
>
connectionDOList
=
new
ArrayList
<>();
for
(
String
clientType:
jsonObject
.
keySet
())
{
JSONObject
topicObject
=
jsonObject
.
getJSONObject
(
clientType
);
// 解析单个Topic的连接信息
for
(
String
topicName:
topicObject
.
keySet
())
{
JSONArray
appIdArray
=
topicObject
.
getJSONArray
(
topicName
);
for
(
Object
appIdDetail
:
appIdArray
.
toArray
())
{
TopicConnectionDO
connectionDO
=
new
TopicConnectionDO
();
String
[]
appIdDetailArray
=
appIdDetail
.
toString
().
split
(
"#"
);
if
(
appIdDetailArray
.
length
==
3
)
{
connectionDO
.
setAppId
(
appIdDetailArray
[
0
]);
connectionDO
.
setIp
(
appIdDetailArray
[
1
]);
connectionDO
.
setClientVersion
(
appIdDetailArray
[
2
]);
}
connectionDO
.
setClusterId
(
clusterId
);
connectionDO
.
setTopicName
(
topicName
);
connectionDO
.
setType
(
clientType
);
connectionDOList
.
add
(
connectionDO
);
}
}
}
return
connectionDOList
;
}
}
}
\ No newline at end of file
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/cache/PhysicalClusterMetadataManager.java
浏览文件 @
f84d2501
...
@@ -311,6 +311,19 @@ public class PhysicalClusterMetadataManager {
...
@@ -311,6 +311,19 @@ public class PhysicalClusterMetadataManager {
return
metadataMap
.
get
(
brokerId
);
return
metadataMap
.
get
(
brokerId
);
}
}
public
static
BrokerMetadata
getBrokerMetadata
(
Long
clusterId
,
String
hostname
)
{
Map
<
Integer
,
BrokerMetadata
>
metadataMap
=
BROKER_METADATA_MAP
.
get
(
clusterId
);
if
(
metadataMap
==
null
)
{
return
null
;
}
for
(
BrokerMetadata
brokerMetadata:
metadataMap
.
values
())
{
if
(
brokerMetadata
.
getHost
().
equals
(
hostname
))
{
return
brokerMetadata
;
}
}
return
null
;
}
public
static
Map
<
String
,
List
<
String
>>
getBrokerHostKafkaRoleMap
(
Long
clusterId
)
{
public
static
Map
<
String
,
List
<
String
>>
getBrokerHostKafkaRoleMap
(
Long
clusterId
)
{
Map
<
String
,
List
<
String
>>
hostRoleMap
=
new
HashMap
<>();
Map
<
String
,
List
<
String
>>
hostRoleMap
=
new
HashMap
<>();
ControllerData
controllerData
=
CONTROLLER_DATA_MAP
.
get
(
clusterId
);
ControllerData
controllerData
=
CONTROLLER_DATA_MAP
.
get
(
clusterId
);
...
...
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java
浏览文件 @
f84d2501
package
com.xiaojukeji.kafka.manager.service.service.gateway
;
package
com.xiaojukeji.kafka.manager.service.service.gateway
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection
;
import
com.xiaojukeji.kafka.manager.common.entity.
dto.gateway.TopicConnectionDT
O
;
import
com.xiaojukeji.kafka.manager.common.entity.
pojo.gateway.TopicConnectionD
O
;
import
java.util.Date
;
import
java.util.Date
;
import
java.util.List
;
import
java.util.List
;
...
@@ -11,7 +11,7 @@ import java.util.List;
...
@@ -11,7 +11,7 @@ import java.util.List;
* @date 20/4/13
* @date 20/4/13
*/
*/
public
interface
TopicConnectionService
{
public
interface
TopicConnectionService
{
int
batchAdd
(
List
<
TopicConnectionD
TO
>
dt
oList
);
int
batchAdd
(
List
<
TopicConnectionD
O
>
d
oList
);
/**
/**
* 查询连接信息
* 查询连接信息
...
...
kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java
浏览文件 @
f84d2501
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
...
@@ -2,7 +2,6 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO
;
import
com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection
;
import
com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection
;
import
com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicConnectionDTO
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaConstant
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaConstant
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.dao.gateway.TopicConnectionDao
;
import
com.xiaojukeji.kafka.manager.dao.gateway.TopicConnectionDao
;
...
@@ -28,23 +27,16 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
...
@@ -28,23 +27,16 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
private
TopicConnectionDao
topicConnectionDao
;
private
TopicConnectionDao
topicConnectionDao
;
@Override
@Override
public
int
batchAdd
(
List
<
TopicConnectionD
TO
>
dt
oList
)
{
public
int
batchAdd
(
List
<
TopicConnectionD
O
>
d
oList
)
{
if
(
ValidateUtils
.
isEmptyList
(
d
t
oList
))
{
if
(
ValidateUtils
.
isEmptyList
(
doList
))
{
return
0
;
return
0
;
}
}
int
count
=
0
;
int
count
=
0
;
for
(
TopicConnectionD
TO
dto:
dt
oList
)
{
for
(
TopicConnectionD
O
connectionDO:
d
oList
)
{
try
{
try
{
TopicConnectionDO
topicConnectionDO
=
new
TopicConnectionDO
();
count
+=
topicConnectionDao
.
replace
(
connectionDO
);
topicConnectionDO
.
setClusterId
(
dto
.
getClusterId
());
topicConnectionDO
.
setTopicName
(
dto
.
getTopicName
());
topicConnectionDO
.
setType
(
dto
.
getType
());
topicConnectionDO
.
setAppId
(
dto
.
getAppId
());
topicConnectionDO
.
setIp
(
dto
.
getIp
());
topicConnectionDO
.
setClientVersion
(
dto
.
getClientVersion
());
count
+=
topicConnectionDao
.
replace
(
topicConnectionDO
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"replace topic connections failed, data:{}."
,
dto
);
LOGGER
.
error
(
"replace topic connections failed, data:{}."
,
connectionDO
);
}
}
}
}
return
count
;
return
count
;
...
...
kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/agent/n9e/N9e.java
浏览文件 @
f84d2501
package
com.xiaojukeji.kafka.manager.kcm.component.agent.n9e
;
package
com.xiaojukeji.kafka.manager.kcm.component.agent.n9e
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
;
import
com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum
;
import
com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskTypeEnum
;
import
com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData
;
import
com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData
;
import
com.xiaojukeji.kafka.manager.common.utils.HttpUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.HttpUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.JsonUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.JsonUtils
;
...
@@ -18,6 +20,11 @@ import org.slf4j.Logger;
...
@@ -18,6 +20,11 @@ import org.slf4j.Logger;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.stereotype.Service
;
import
org.springframework.stereotype.Service
;
import
javax.annotation.PostConstruct
;
import
java.io.BufferedReader
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.io.InputStreamReader
;
import
java.util.HashMap
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
...
@@ -33,18 +40,20 @@ public class N9e extends AbstractAgent {
...
@@ -33,18 +40,20 @@ public class N9e extends AbstractAgent {
@Value
(
"${kcm.n9e.base-url}"
)
@Value
(
"${kcm.n9e.base-url}"
)
private
String
baseUrl
;
private
String
baseUrl
;
@Value
(
"${kcm.n9e.username}"
)
private
String
username
;
@Value
(
"${kcm.n9e.user-token}"
)
@Value
(
"${kcm.n9e.user-token}"
)
private
String
userToken
;
private
String
userToken
;
@Value
(
"${kcm.n9e.
tpl-id
}"
)
@Value
(
"${kcm.n9e.
account
}"
)
private
Integer
tplId
;
private
String
account
;
@Value
(
"${kcm.n9e.timeout}"
)
@Value
(
"${kcm.n9e.timeout}"
)
private
Integer
timeout
;
private
Integer
timeout
;
@Value
(
"${kcm.n9e.script-file}"
)
private
String
scriptFile
;
private
String
script
;
/**
/**
* 并发度,顺序执行
* 并发度,顺序执行
*/
*/
...
@@ -67,21 +76,14 @@ public class N9e extends AbstractAgent {
...
@@ -67,21 +76,14 @@ public class N9e extends AbstractAgent {
private
static
final
String
TASK_STD_LOG_URI
=
"/api/job-ce/task/{taskId}/stdout.json"
;
private
static
final
String
TASK_STD_LOG_URI
=
"/api/job-ce/task/{taskId}/stdout.json"
;
@PostConstruct
public
void
init
()
{
this
.
script
=
readScriptInJarFile
(
scriptFile
);
}
@Override
@Override
public
Long
createTask
(
CreationTaskData
dto
)
{
public
Long
createTask
(
CreationTaskData
creationTaskData
)
{
StringBuilder
sb
=
new
StringBuilder
();
Map
<
String
,
Object
>
param
=
buildCreateTaskParam
(
creationTaskData
);
sb
.
append
(
dto
.
getKafkaPackageName
()).
append
(
",,"
).
append
(
dto
.
getKafkaPackageMd5
()).
append
(
",,"
);
sb
.
append
(
dto
.
getServerPropertiesName
()).
append
(
",,"
).
append
(
dto
.
getServerPropertiesMd5
());
Map
<
String
,
Object
>
param
=
new
HashMap
<>();
param
.
put
(
"tpl_id"
,
tplId
);
param
.
put
(
"batch"
,
BATCH
);
param
.
put
(
"tolerance"
,
TOLERANCE
);
param
.
put
(
"timeout"
,
timeout
);
param
.
put
(
"hosts"
,
dto
.
getHostList
());
param
.
put
(
"pause"
,
ListUtils
.
strList2String
(
dto
.
getPauseList
()));
param
.
put
(
"action"
,
"pause"
);
param
.
put
(
"args"
,
sb
.
toString
());
String
response
=
null
;
String
response
=
null
;
try
{
try
{
...
@@ -96,7 +98,7 @@ public class N9e extends AbstractAgent {
...
@@ -96,7 +98,7 @@ public class N9e extends AbstractAgent {
}
}
return
Long
.
valueOf
(
zr
.
getDat
().
toString
());
return
Long
.
valueOf
(
zr
.
getDat
().
toString
());
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"create task failed,
dto:{}."
,
dto
,
e
);
LOGGER
.
error
(
"create task failed,
req:{}."
,
creationTaskData
,
e
);
}
}
return
null
;
return
null
;
}
}
...
@@ -126,7 +128,7 @@ public class N9e extends AbstractAgent {
...
@@ -126,7 +128,7 @@ public class N9e extends AbstractAgent {
@Override
@Override
public
Boolean
actionHostTask
(
Long
taskId
,
String
action
,
String
hostname
)
{
public
Boolean
actionHostTask
(
Long
taskId
,
String
action
,
String
hostname
)
{
Map
<
String
,
Object
>
param
=
new
HashMap
<>(
3
);
Map
<
String
,
Object
>
param
=
new
HashMap
<>(
2
);
param
.
put
(
"action"
,
action
);
param
.
put
(
"action"
,
action
);
param
.
put
(
"hostname"
,
hostname
);
param
.
put
(
"hostname"
,
hostname
);
...
@@ -143,7 +145,7 @@ public class N9e extends AbstractAgent {
...
@@ -143,7 +145,7 @@ public class N9e extends AbstractAgent {
}
}
return
false
;
return
false
;
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"action task failed, taskId:{}
, action:{},
hostname:{}."
,
taskId
,
action
,
hostname
,
e
);
LOGGER
.
error
(
"action task failed, taskId:{}
action:{}
hostname:{}."
,
taskId
,
action
,
hostname
,
e
);
}
}
return
false
;
return
false
;
}
}
...
@@ -186,7 +188,7 @@ public class N9e extends AbstractAgent {
...
@@ -186,7 +188,7 @@ public class N9e extends AbstractAgent {
JSON
.
parseObject
(
JSON
.
toJSONString
(
n9eResult
.
getDat
()),
N9eTaskResultDTO
.
class
);
JSON
.
parseObject
(
JSON
.
toJSONString
(
n9eResult
.
getDat
()),
N9eTaskResultDTO
.
class
);
return
n9eTaskResultDTO
.
convert2HostnameStatusMap
();
return
n9eTaskResultDTO
.
convert2HostnameStatusMap
();
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"get task
status failed, agentTaskId:{}."
,
agentTaskId
,
e
);
LOGGER
.
error
(
"get task
result failed, agentTaskId:{} response:{}."
,
agentTaskId
,
response
,
e
);
}
}
return
null
;
return
null
;
}
}
...
@@ -217,9 +219,63 @@ public class N9e extends AbstractAgent {
...
@@ -217,9 +219,63 @@ public class N9e extends AbstractAgent {
}
}
private
Map
<
String
,
String
>
buildHeader
()
{
private
Map
<
String
,
String
>
buildHeader
()
{
Map
<
String
,
String
>
headers
=
new
HashMap
<>(
1
);
Map
<
String
,
String
>
headers
=
new
HashMap
<>(
2
);
headers
.
put
(
"Content-Type"
,
"application/json;charset=UTF-8"
);
headers
.
put
(
"Content-Type"
,
"application/json;charset=UTF-8"
);
headers
.
put
(
"X-User-Token"
,
userToken
);
headers
.
put
(
"X-User-Token"
,
userToken
);
return
headers
;
return
headers
;
}
}
private
Map
<
String
,
Object
>
buildCreateTaskParam
(
CreationTaskData
creationTaskData
)
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
creationTaskData
.
getUuid
()).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getClusterId
()).
append
(
",,"
);
sb
.
append
(
ClusterTaskTypeEnum
.
getByName
(
creationTaskData
.
getTaskType
()).
getWay
()).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getKafkaPackageName
().
replace
(
KafkaFileEnum
.
PACKAGE
.
getSuffix
(),
""
)).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getKafkaPackageMd5
()).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getKafkaPackageUrl
()).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getServerPropertiesName
().
replace
(
KafkaFileEnum
.
SERVER_CONFIG
.
getSuffix
(),
""
)).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getServerPropertiesMd5
()).
append
(
",,"
);
sb
.
append
(
creationTaskData
.
getServerPropertiesUrl
());
Map
<
String
,
Object
>
params
=
new
HashMap
<>(
10
);
params
.
put
(
"title"
,
String
.
format
(
"集群ID=%d-升级部署"
,
creationTaskData
.
getClusterId
()));
params
.
put
(
"batch"
,
BATCH
);
params
.
put
(
"tolerance"
,
TOLERANCE
);
params
.
put
(
"timeout"
,
timeout
);
params
.
put
(
"pause"
,
ListUtils
.
strList2String
(
creationTaskData
.
getPauseList
()));
params
.
put
(
"script"
,
this
.
script
);
params
.
put
(
"args"
,
sb
.
toString
());
params
.
put
(
"account"
,
account
);
params
.
put
(
"action"
,
"pause"
);
params
.
put
(
"hosts"
,
creationTaskData
.
getHostList
());
return
params
;
}
private
static
String
readScriptInJarFile
(
String
fileName
)
{
InputStream
inputStream
=
N9e
.
class
.
getClassLoader
().
getResourceAsStream
(
fileName
);
if
(
inputStream
==
null
)
{
LOGGER
.
error
(
"read kcm script failed, filename:{}"
,
fileName
);
return
""
;
}
try
{
BufferedReader
bufferedReader
=
new
BufferedReader
(
new
InputStreamReader
(
inputStream
));
String
line
=
null
;
StringBuilder
stringBuilder
=
new
StringBuilder
(
""
);
while
((
line
=
bufferedReader
.
readLine
())
!=
null
)
{
stringBuilder
.
append
(
line
);
}
return
stringBuilder
.
toString
();
}
catch
(
IOException
e
)
{
LOGGER
.
error
(
"read kcm script failed, filename:{}"
,
fileName
,
e
);
return
""
;
}
finally
{
try
{
inputStream
.
close
();
}
catch
(
IOException
e
)
{
LOGGER
.
error
(
"close reading kcm script failed, filename:{}"
,
fileName
,
e
);
}
}
}
}
}
\ No newline at end of file
kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/common/StorageEnum.java
浏览文件 @
f84d2501
...
@@ -6,6 +6,7 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.common;
...
@@ -6,6 +6,7 @@ package com.xiaojukeji.kafka.manager.kcm.component.storage.common;
* @date 20/4/29
* @date 20/4/29
*/
*/
public
enum
StorageEnum
{
public
enum
StorageEnum
{
GIFT
(
0
,
"gift"
),
GIT
(
1
,
"git"
),
GIT
(
1
,
"git"
),
S3
(
2
,
"S3"
),
S3
(
2
,
"S3"
),
;
;
...
...
kafka-manager-extends/kafka-manager-kcm/src/main/resources/kcm_script.sh
0 → 100644
浏览文件 @
f84d2501
#!/bin/sh
#集群任务脚本
set
-x
# 调试方式执行
#----------------------------------------日志格式------------------------------------------------------#
alias
ECHO_LOG
=
'echo `date +%F%n%T` hostname:`hostname` Line:${LINENO} '
#----------------------------------------参数列表------------------------------------------------------#
p_task_id
=
${
1
}
#任务ID
p_cluster_id
=
${
2
}
#集群ID
p_cluster_task_type
=
${
3
}
#任务类型[0:升级, 1:新部署, 2:回滚]
p_kafka_package_name
=
${
4
}
#包名
p_kafka_package_md5
=
${
5
}
#包MD5
p_kafka_package_url
=
${
6
}
#包下载地址
p_kafka_server_properties_name
=
${
7
}
#server配置名
p_kafka_server_properties_md5
=
${
8
}
#server配置MD5
p_kafka_server_properties_url
=
${
9
}
#server配置文件下载地址
#----------------------------------------配置信息------------------------------------------------------#
g_hostname
=
`
hostname
`
g_base_dir
=
'/home/km'
g_cluster_task_dir
=
${
g_base_dir
}
"/kafka_cluster_task/task_
${
p_task_id
}
"
#部署升级路径
g_rollback_version
=
${
g_cluster_task_dir
}
"/rollback_version"
#回滚版本
g_new_kafka_package_name
=
''
#最终的包名
g_kafka_manager_addr
=
''
#kafka-manager地址
#----------------------------------------操作函数------------------------------------------------------#
# dhcat通知
function
dchat_alarm
()
{
alarm_msg
=
$1
data
=
'
{
"text": "'
${
g_hostname
}
' 升级失败,请及时处理",
"attachments": [
{
"title": "'
${
alarm_msg
}
'",
"color": "#ffa500"
}
]
}'
#curl -H 'Content-Type: application/json' -d "${data}" ${dchat_bot}
}
# 检查并初始化环境
function
check_and_init_env
()
{
if
[
-z
"
${
p_task_id
}
"
-o
-z
"
${
p_cluster_task_type
}
"
-o
-z
"
${
p_kafka_package_url
}
"
-o
-z
"
${
p_cluster_id
}
"
-o
-z
"
${
p_kafka_package_name
}
"
-o
-z
"
${
p_kafka_package_md5
}
"
-o
-z
"
${
p_kafka_server_properties_name
}
"
-o
-z
"
${
p_kafka_server_properties_md5
}
"
]
;
then
ECHO_LOG
"存在为空的参数不合法, 退出集群任务"
dchat_alarm
"存在为空的参数不合法, 退出集群任务"
exit
1
fi
cd
${
g_base_dir
}
if
[
$?
-ne
0
-o
!
-x
"
${
g_base_dir
}
"
]
;
then
ECHO_LOG
"
${
g_base_dir
}
目录不存在或无权限, 退出集群任务"
dchat_alarm
"
${
g_base_dir
}
目录不存在或无权限, 退出集群任务"
exit
1
fi
ECHO_LOG
"初始化集群任务所需的目录"
mkdir
-p
${
g_cluster_task_dir
}
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"创建集群任务路径失败, 退出集群任务"
dchat_alarm
"创建集群任务路径失败, 退出集群任务"
exit
1
fi
}
# 检查并等待集群所有的副本处于同步的状态
function
check_and_wait_broker_stabled
()
{
under_replication_count
=
`
curl
-s
-G
-d
"hostname="
#{g_hostname} ${g_kafka_manager_addr}/api/v1/third-part/${p_cluster_id}/broker-stabled | python -m json.tool | grep true |wc -l`
while
[
"
$under_replication_count
"
-ne
1
]
;
do
ECHO_LOG
"存在
${
under_replication_count
}
个副本未同步, sleep 10s"
sleep
10
under_replication_count
=
`
curl
-s
${
g_kafka_manager_addr
}
/api/v1/
${
p_cluster_id
}
/overview | python
-m
json.tool |
grep false
|wc
-l
`
done
ECHO_LOG
"集群副本都已经处于同步的状态, 可以进行集群升级"
}
# 拉包并检查其md5
function
pull_and_check_kafka_package
()
{
ECHO_LOG
"开始下载
${
1
}
文件"
wget
${
1
}
-P
${
g_cluster_task_dir
}
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"下载
${
1
}
失败, 退出集群任务"
dchat_alarm
"下载
${
1
}
失败, 退出集群任务"
exit
1
fi
file_md5_sum
=
`
md5sum
"
${
g_cluster_task_dir
}
/
${
p_kafka_package_name
}
.tgz"
|
awk
-F
" "
'{print $1}'
`
if
[
"
$file_md5_sum
"
!=
"
${
2
}
"
]
;
then
ECHO_LOG
"下载
${
1
}
成功, 但是校验md5失败, 退出集群任务"
dchat_alarm
"下载
${
1
}
成功, 但是校验md5失败, 退出集群任务"
exit
1
fi
}
# 拉配置文件并检查其md5
function
pull_and_check_kafka_properties
()
{
ECHO_LOG
"开始下载
${
1
}
文件"
wget
${
1
}
-P
${
g_cluster_task_dir
}
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"下载
${
1
}
失败, 退出集群任务"
dchat_alarm
"下载
${
1
}
失败, 退出集群任务"
exit
1
fi
file_md5_sum
=
`
md5sum
"
${
g_cluster_task_dir
}
/
${
p_kafka_server_properties_name
}
.properties"
|
awk
-F
" "
'{print $1}'
`
if
[
"
$file_md5_sum
"
!=
"
${
2
}
"
]
;
then
ECHO_LOG
"下载
${
1
}
成功, 但是校验md5失败, 退出集群任务"
dchat_alarm
"下载
${
1
}
成功, 但是校验md5失败, 退出集群任务"
exit
1
fi
}
# 准备集群任务的文件
function
prepare_cluster_task_files
()
{
pull_and_check_kafka_package
${
p_kafka_package_url
}
${
p_kafka_package_md5
}
ECHO_LOG
"解压并拷贝kafka包文件"
tar
-zxf
"
${
g_cluster_task_dir
}
/
${
p_kafka_package_name
}
.tgz"
-C
"
${
g_cluster_task_dir
}
"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"解压
${
p_kafka_package_name
}
.tgz失败, 退出集群任务"
dchat_alarm
"解压
${
p_kafka_package_name
}
.tgz失败, 退出集群任务"
exit
1
fi
pull_and_check_kafka_properties
${
p_kafka_server_properties_url
}
${
p_kafka_server_properties_md5
}
ECHO_LOG
"拷贝kafka配置文件"
cp
-f
"
${
g_cluster_task_dir
}
/
${
p_kafka_server_properties_name
}
.properties"
"
${
g_cluster_task_dir
}
/
${
p_kafka_package_name
}
/config/server.properties"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"拷贝
${
p_kafka_server_properties_name
}
.properties失败, 退出集群任务"
dchat_alarm
"拷贝
${
p_kafka_server_properties_name
}
.properties失败, 退出集群任务"
exit
1
fi
# 将MD5信息写到包中
echo
"package_md5:
${
p_kafka_package_md5
}
server_properties_md5:
${
p_kafka_package_md5
}
"
>
"
${
g_cluster_task_dir
}
/
${
p_kafka_package_name
}
/package_and_properties.md5"
}
# 停kafka服务
function
stop_kafka_server
()
{
sh
${
g_base_dir
}
"/kafka/bin/kafka-server-stop.sh"
ECHO_LOG
"检查并等待kafka服务下线"
kafka_pid
=
`
jps |
grep
'Kafka'
|
awk
'{print $1}'
`
while
[
!
-z
"
${
kafka_pid
}
"
]
;
do
ECHO_LOG
"kafka服务未下线, 继续sleep 5s"
sleep
5
kafka_pid
=
`
jps |
grep
'Kafka'
|
awk
'{print $1}'
`
done
ECHO_LOG
"kafka服务已停掉"
}
function
cal_new_package_name
()
{
if
[
!
-d
"
${
g_base_dir
}
/
${
p_kafka_package_name
}
"
]
;
then
# 当前使用的包版本未部署过
g_new_kafka_package_name
=
${
p_kafka_package_name
}
return
fi
deploy_version
=
1
while
[
${
deploy_version
}
-le
1000
]
;
do
if
[
!
-d
"
${
g_base_dir
}
/
${
p_kafka_package_name
}
_v
${
deploy_version
}
"
]
;
then
g_new_kafka_package_name
=
"
${
p_kafka_package_name
}
_v
${
deploy_version
}
"
return
fi
ECHO_LOG
"包
${
p_kafka_package_name
}
_v
${
deploy_version
}
已经存在"
deploy_version
=
`
expr
${
deploy_version
}
+ 1
`
done
}
#
function
backup_and_init_new_kafka_server_soft_link
()
{
cd
${
g_base_dir
}
/
"kafka"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"kafka软链不存在, 退出集群任务"
dchat_alarm
"kafka软链不存在, 退出集群任务"
exit
1
fi
# 纪录回滚版本
kafka_absolute_path
=
`
pwd
-P
`
rollback_version
=
`
basename
"
${
kafka_absolute_path
}
"
`
echo
${
rollback_version
}
>
${
g_rollback_version
}
ECHO_LOG
"上一版本:
${
rollback_version
}
"
# 去除软链
unlink
${
g_base_dir
}
/
"kafka"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"移除软链失败, 退出集群任务"
dchat_alarm
"移除软链失败, 退出集群任务"
exit
1
fi
# 计算新的包名及初始化环境
init_new_kafka_server_soft_link
}
# 回滚之前的版本
function
rollback_kafka_server_soft_link
()
{
if
[
!
-f
"
${
g_rollback_version
}
"
]
;
then
ECHO_LOG
"回滚文件不存在, 退出集群任务"
dchat_alarm
"回滚文件不存在, 退出集群任务"
exit
1
fi
rollback_version
=
`
cat
${
g_rollback_version
}
`
if
[
!
-n
"
${
rollback_version
}
"
]
;
then
ECHO_LOG
"回滚信息不存在, 退出集群任务"
dchat_alarm
"回滚信息不存在, 退出集群任务"
exit
1
fi
# 去除软链
unlink
${
g_base_dir
}
/
"kafka"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"移除软链失败, 退出集群任务"
dchat_alarm
"移除软链失败, 退出集群任务"
exit
1
fi
ln
-s
"
${
g_base_dir
}
/
${
rollback_version
}
"
"
${
g_base_dir
}
/kafka"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"创建软链失败, 退出集群任务"
dchat_alarm
"创建软链失败, 退出集群任务"
exit
1
fi
ECHO_LOG
"修改软链成功"
}
function
init_new_kafka_server_soft_link
()
{
if
[
-L
"
${
g_base_dir
}
/kafka"
]
;
then
ECHO_LOG
"kafka软链依旧存在, 退出集群任务"
dchat_alarm
"kafka软链依旧存在, 退出集群任务"
exit
1
fi
# 计算新的包名
cal_new_package_name
ECHO_LOG
"集群任务新包的名字为
${
g_new_kafka_package_name
}
"
# 拷贝新的包
cp
-rf
"
${
g_cluster_task_dir
}
/
${
p_kafka_package_name
}
"
"
${
g_base_dir
}
/
${
g_new_kafka_package_name
}
"
ln
-s
"
${
g_base_dir
}
/
${
g_new_kafka_package_name
}
"
"
${
g_base_dir
}
/kafka"
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"创建软链失败, 退出集群任务"
dchat_alarm
"创建软链失败, 退出集群任务"
exit
1
fi
ECHO_LOG
"备份并修改软链成功"
}
function
check_and_wait_kafka_process_started
()
{
sleep
1
# 等待并检查kafka进程是否正常启动
ECHO_LOG
"开始等待并检查进程是否正常启动"
if
[
!
-L
"
${
g_base_dir
}
/kafka"
]
;
then
ECHO_LOG
"kafka软链不存在, 退出集群任务"
dchat_alarm
"kafka软链不存在, 退出集群任务"
exit
1
fi
log_started_count
=
0
while
[
"
$log_started_count
"
==
"0"
]
;
do
# 检查进程是否存活
kafka_pid
=
`
jps |
grep
'Kafka'
|
awk
'{print $1}'
`
if
[
-z
"
${
kafka_pid
}
"
]
;
then
ECHO_LOG
"安装失败, kafka进程不存在, 退出集群任务"
dchat_alarm
"安装失败, kafka进程不存在, 退出集群任务"
exit
1
fi
sleep
2
#检查是否存在NotLeader的分区
not_leader_error_count
=
`
grep
Exception
${
g_base_dir
}
/kafka/logs/server.log
*
|
grep
NotLeaderForPartitionException |
wc
-l
`
if
[
${
not_leader_error_count
}
-gt
0
]
;
then
ECHO_LOG
"安装失败, 存在无Leader的分区, 退出集群任务"
dchat_alarm
"安装失败, 存在无Leader的分区, 退出集群任务"
exit
1
fi
#判断Started的日志是否正常打印出来了
log_started_count
=
`
grep
Started
${
g_base_dir
}
/kafka/logs/server.log |
wc
-l
`
done
ECHO_LOG
"进程已正常启动, 结束进程状态检查"
}
start_new_kafka_server
()
{
nohup
sh
"
${
g_base_dir
}
/kafka/bin/kafka-server-start.sh"
"
${
g_base_dir
}
/kafka/config/server.properties"
>
/dev/null 2>&1 &
if
[
$?
-ne
0
]
;
then
ECHO_LOG
"启动kafka服务失败, 退出部署升级"
dchat_alarm
"启动kafka服务失败, 退出部署升级"
exit
1
fi
}
#----------------------------------部署流程---------------------------------------------------------#
ECHO_LOG
"集群任务启动..."
ECHO_LOG
"参数信息: "
ECHO_LOG
" p_task_id=
${
p_task_id
}
"
ECHO_LOG
" p_cluster_id=
${
p_cluster_id
}
"
ECHO_LOG
" p_cluster_task_type=
${
p_cluster_task_type
}
"
ECHO_LOG
" p_kafka_package_name=
${
p_kafka_package_name
}
"
ECHO_LOG
" p_kafka_package_md5=
${
p_kafka_package_md5
}
"
ECHO_LOG
" p_kafka_server_properties_name=
${
p_kafka_server_properties_name
}
"
ECHO_LOG
" p_kafka_server_properties_md5=
${
p_kafka_server_properties_md5
}
"
if
[
"
${
p_cluster_task_type
}
"
==
"0"
-o
"
${
p_cluster_task_type
}
"
==
"1"
]
;
then
ECHO_LOG
"检查并初始化环境"
check_and_init_env
ECHO_LOG
"准备集群任务所需的文件"
prepare_cluster_task_files
else
ECHO_LOG
"升级回滚, 无需准备新文件"
fi
#ECHO_LOG "检查并等待Broker处于稳定状态"
#check_and_wait_broker_stabled
ECHO_LOG
"停kafka服务"
stop_kafka_server
ECHO_LOG
"停5秒, 确保"
sleep
5
if
[
"
${
p_cluster_task_type
}
"
==
"0"
]
;
then
ECHO_LOG
"备份并初始化升级所需的环境(包/软链等)"
backup_and_init_new_kafka_server_soft_link
elif
[
"
${
p_cluster_task_type
}
"
==
"1"
]
;
then
ECHO_LOG
"初始化部署所需的环境(包/软链等)"
init_new_kafka_server_soft_link
else
ECHO_LOG
"回滚旧的环境(包/软链等)"
rollback_kafka_server_soft_link
fi
ECHO_LOG
"启动kafka服务"
start_new_kafka_server
ECHO_LOG
"检查并等待kafka服务正常运行..."
check_and_wait_kafka_process_started
ECHO_LOG
"检查并等待Broker处于稳定状态"
check_and_wait_broker_stabled
ECHO_LOG
"清理临时文件"
#rm -r ${g_cluster_task_dir}
ECHO_LOG
"升级成功, 结束升级"
\ No newline at end of file
kafka-manager-extends/kafka-manager-monitor/src/main/java/com/xiaojukeji/kafka/manager/monitor/component/n9e/N9eService.java
浏览文件 @
f84d2501
...
@@ -72,7 +72,7 @@ public class N9eService extends AbstractMonitorService {
...
@@ -72,7 +72,7 @@ public class N9eService extends AbstractMonitorService {
/**
/**
* 告警组
* 告警组
*/
*/
private
static
final
String
ALL_NOTIFY_GROUP_URL
=
"/api/
mon
/teams/all"
;
private
static
final
String
ALL_NOTIFY_GROUP_URL
=
"/api/
rdb
/teams/all"
;
/**
/**
* 监控策略的增删改查
* 监控策略的增删改查
...
...
kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/biz/SyncClusterTaskState.java
浏览文件 @
f84d2501
...
@@ -11,6 +11,7 @@ import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
...
@@ -11,6 +11,7 @@ import com.xiaojukeji.kafka.manager.task.component.EmptyEntry;
import
org.slf4j.Logger
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
;
import
java.util.Arrays
;
import
java.util.Arrays
;
import
java.util.List
;
import
java.util.List
;
...
@@ -20,6 +21,7 @@ import java.util.List;
...
@@ -20,6 +21,7 @@ import java.util.List;
* @date 20/9/7
* @date 20/9/7
*/
*/
@CustomScheduled
(
name
=
"syncClusterTaskState"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@CustomScheduled
(
name
=
"syncClusterTaskState"
,
cron
=
"0 0/1 * * * ?"
,
threadNum
=
1
)
@ConditionalOnProperty
(
prefix
=
"kcm"
,
name
=
"enabled"
,
havingValue
=
"true"
,
matchIfMissing
=
true
)
public
class
SyncClusterTaskState
extends
AbstractScheduledTask
<
EmptyEntry
>
{
public
class
SyncClusterTaskState
extends
AbstractScheduledTask
<
EmptyEntry
>
{
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
private
final
static
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
LogConstant
.
SCHEDULED_TASK_LOGGER
);
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/gateway/GatewayHeartbeatController.java
浏览文件 @
f84d2501
package
com.xiaojukeji.kafka.manager.web.api.versionone.gateway
;
package
com.xiaojukeji.kafka.manager.web.api.versionone.gateway
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSON
Object
;
import
com.xiaojukeji.kafka.manager.common.annotations.ApiLevel
;
import
com.xiaojukeji.kafka.manager.common.annotations.ApiLevel
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiLevelContent
;
import
com.xiaojukeji.kafka.manager.common.entity.Result
;
import
com.xiaojukeji.kafka.manager.common.entity.Result
;
import
com.xiaojukeji.kafka.manager.common.
entity.dto.gateway.TopicConnectionDTO
;
import
com.xiaojukeji.kafka.manager.common.
utils.JsonUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService
;
import
com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiPrefix
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiPrefix
;
...
@@ -15,8 +15,6 @@ import org.slf4j.LoggerFactory;
...
@@ -15,8 +15,6 @@ import org.slf4j.LoggerFactory;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
import
java.util.List
;
/**
/**
* @author zengqiao
* @author zengqiao
* @date 20/7/6
* @date 20/7/6
...
@@ -34,18 +32,17 @@ public class GatewayHeartbeatController {
...
@@ -34,18 +32,17 @@ public class GatewayHeartbeatController {
@ApiOperation
(
value
=
"连接信息上报入口"
,
notes
=
"Broker主动上报信息"
)
@ApiOperation
(
value
=
"连接信息上报入口"
,
notes
=
"Broker主动上报信息"
)
@RequestMapping
(
value
=
"heartbeat/survive-user"
,
method
=
RequestMethod
.
POST
)
@RequestMapping
(
value
=
"heartbeat/survive-user"
,
method
=
RequestMethod
.
POST
)
@ResponseBody
@ResponseBody
public
Result
receiveTopicConnections
(
@RequestParam
(
"clusterId"
)
Stri
ng
clusterId
,
public
Result
receiveTopicConnections
(
@RequestParam
(
"clusterId"
)
Lo
ng
clusterId
,
@RequestParam
(
"brokerId"
)
String
brokerId
,
@RequestParam
(
"brokerId"
)
Integer
brokerId
,
@RequestBody
List
<
TopicConnectionDTO
>
dtoLis
t
)
{
@RequestBody
JSONObject
jsonObjec
t
)
{
try
{
try
{
if
(
ValidateUtils
.
is
EmptyList
(
dtoList
))
{
if
(
ValidateUtils
.
is
Null
(
jsonObject
)
||
jsonObject
.
isEmpty
(
))
{
return
Result
.
buildSuc
();
return
Result
.
buildSuc
();
}
}
topicConnectionService
.
batchAdd
(
dtoList
);
topicConnectionService
.
batchAdd
(
JsonUtils
.
parseTopicConnections
(
clusterId
,
jsonObject
)
);
return
Result
.
buildSuc
();
return
Result
.
buildSuc
();
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
LOGGER
.
error
(
"receive topic connections failed, clusterId:{} brokerId:{} req:{}"
,
LOGGER
.
error
(
"receive topic connections failed, clusterId:{} brokerId:{} req:{}"
,
clusterId
,
brokerId
,
jsonObject
,
e
);
clusterId
,
brokerId
,
JSON
.
toJSONString
(
dtoList
),
e
);
}
}
return
Result
.
buildFailure
(
"fail"
);
return
Result
.
buildFailure
(
"fail"
);
}
}
...
...
kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/thirdpart/ThirdPartClusterController.java
0 → 100644
浏览文件 @
f84d2501
package
com.xiaojukeji.kafka.manager.web.api.versionone.thirdpart
;
import
com.xiaojukeji.kafka.manager.common.constant.ApiPrefix
;
import
com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections
;
import
com.xiaojukeji.kafka.manager.common.entity.Result
;
import
com.xiaojukeji.kafka.manager.common.entity.ResultStatus
;
import
com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics
;
import
com.xiaojukeji.kafka.manager.common.utils.ValidateUtils
;
import
com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata
;
import
com.xiaojukeji.kafka.manager.openapi.common.vo.ThirdPartBrokerOverviewVO
;
import
com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager
;
import
com.xiaojukeji.kafka.manager.service.service.BrokerService
;
import
io.swagger.annotations.Api
;
import
io.swagger.annotations.ApiOperation
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.*
;
/**
* @author zengqiao
* @date 20/11/9
*/
@Api
(
tags
=
"开放接口-Cluster相关接口(REST)"
)
@RestController
@RequestMapping
(
ApiPrefix
.
API_V1_THIRD_PART_PREFIX
)
public
class
ThirdPartClusterController
{
@Autowired
private
BrokerService
brokerService
;
@ApiOperation
(
value
=
"Broker信息概览"
,
notes
=
""
)
@RequestMapping
(
value
=
"{clusterId}/broker-stabled"
,
method
=
RequestMethod
.
GET
)
@ResponseBody
public
Result
<
Boolean
>
checkBrokerStabled
(
@PathVariable
Long
clusterId
,
@RequestParam
(
"hostname"
)
String
hostname
)
{
BrokerMetadata
brokerMetadata
=
PhysicalClusterMetadataManager
.
getBrokerMetadata
(
clusterId
,
hostname
);
if
(
ValidateUtils
.
isNull
(
brokerMetadata
))
{
return
Result
.
buildFrom
(
ResultStatus
.
BROKER_NOT_EXIST
);
}
BrokerMetrics
brokerMetrics
=
brokerService
.
getBrokerMetricsFromJmx
(
clusterId
,
brokerMetadata
.
getBrokerId
(),
KafkaMetricsCollections
.
BROKER_STATUS_PAGE_METRICS
);
if
(
ValidateUtils
.
isNull
(
brokerMetrics
))
{
return
Result
.
buildFrom
(
ResultStatus
.
OPERATION_FAILED
);
}
Integer
underReplicated
=
brokerMetrics
.
getSpecifiedMetrics
(
"UnderReplicatedPartitionsValue"
,
Integer
.
class
);
if
(
ValidateUtils
.
isNull
(
underReplicated
))
{
return
Result
.
buildFrom
(
ResultStatus
.
OPERATION_FAILED
);
}
return
new
Result
<>(
underReplicated
.
equals
(
0
));
}
}
\ No newline at end of file
kafka-manager-web/src/main/resources/application.yml
浏览文件 @
f84d2501
...
@@ -37,12 +37,13 @@ account:
...
@@ -37,12 +37,13 @@ account:
ldap
:
ldap
:
kcm
:
kcm
:
enabled
:
false
n9e
:
n9e
:
base-url
:
http://127.0.0.1
/api
base-url
:
http://127.0.0.1
:8080
user
name
:
admin
user
-token
:
12345678
user-token
:
admin
timeout
:
300
tpl-id
:
123456
account
:
km
timeout
:
30
script-file
:
kcm_script.sh
monitor
:
monitor
:
enabled
:
false
enabled
:
false
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录