Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
hertzbeat
Hertzbeat
提交
21bc7fcc
Hertzbeat
项目概览
hertzbeat
/
Hertzbeat
9 个月 前同步成功
通知
1
Star
2
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
Hertzbeat
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
21bc7fcc
编写于
6月 28, 2023
作者:
C
Ceilzcx
提交者:
GitHub
6月 28, 2023
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
support monitoring apache rocketmq metrics (#1046)
上级
694d6b26
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
716 addition
and
0 deletion
+716
-0
collector/pom.xml
collector/pom.xml
+7
-0
collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketmqCollectData.java
...a/hertzbeat/collector/collect/mq/RocketmqCollectData.java
+144
-0
collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketmqSingleCollectImpl.java
...zbeat/collector/collect/mq/RocketmqSingleCollectImpl.java
+368
-0
collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java
...omara/hertzbeat/collector/dispatch/DispatchConstants.java
+4
-0
collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect
...s/org.dromara.hertzbeat.collector.collect.AbstractCollect
+1
-0
common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java
...java/org/dromara/hertzbeat/common/entity/job/Metrics.java
+4
-0
common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/RocketmqProtocol.java
...ertzbeat/common/entity/job/protocol/RocketmqProtocol.java
+44
-0
manager/src/main/resources/define/app-rocketmq.yml
manager/src/main/resources/define/app-rocketmq.yml
+144
-0
未找到文件。
collector/pom.xml
浏览文件 @
21bc7fcc
...
...
@@ -163,6 +163,13 @@
<artifactId>
snmp4j
</artifactId>
<version>
3.6.7
</version>
</dependency>
<!-- rocketmq -->
<dependency>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-tools
</artifactId>
<version>
4.9.4
</version>
</dependency>
</dependencies>
</project>
collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketmqCollectData.java
0 → 100644
浏览文件 @
21bc7fcc
package
org.dromara.hertzbeat.collector.collect.mq
;
import
lombok.Data
;
import
java.util.List
;
import
java.util.Map
;
/**
* rocketmq采集数据实体类
*
* @author ceilzcx
* @since 5/6/2023
*/
@Data
public
class
RocketmqCollectData
{
/**
* cluster broker info
*/
private
List
<
ClusterBrokerData
>
clusterBrokerDataList
;
/**
* consumer info
*/
private
List
<
ConsumerInfo
>
consumerInfoList
;
/**
* topic info
* Map[key: TopicName, value: Topic Queue info List]
*/
private
List
<
Map
<
String
,
List
<
TopicQueueInfo
>>>
topicInfoList
;
@Data
public
static
class
ClusterBrokerData
{
/**
* broker id
*/
private
Long
brokerId
;
/**
* broker address
*/
private
String
address
;
/**
* mq version
*/
private
String
version
;
/**
* producer send message tps
*/
private
double
producerMessageTps
;
/**
* consumer receive message tps
*/
private
double
consumerMessageTps
;
/**
* yesterday producer send message count
*/
private
long
yesterdayProduceCount
;
/**
* today producer send message count
*/
private
long
todayProduceCount
;
/**
* yesterday consumer receive message count
*/
private
long
yesterdayConsumeCount
;
/**
* today consumer receive message count
*/
private
long
todayConsumeCount
;
}
@Data
public
static
class
ConsumerInfo
{
/**
* consumer group
*/
private
String
consumerGroup
;
/**
* client num
*/
private
int
clientQuantity
;
/**
* message model
*/
private
String
messageModel
;
/**
* consume type
*/
private
String
consumeType
;
/**
* consume tps
*/
private
double
consumeTps
;
/**
* message delay
*/
private
long
diffTotal
;
}
@Data
public
static
class
TopicQueueInfo
{
/**
* broker name
*/
private
String
brokerName
;
/**
* queue id
*/
private
int
queueId
;
/**
* message queue min offset
*/
private
long
minOffset
;
/**
* message queue max offset
*/
private
long
maxOffset
;
/**
* last update time(ms)
*/
private
long
lastUpdateTimestamp
;
}
}
collector/src/main/java/org/dromara/hertzbeat/collector/collect/mq/RocketmqSingleCollectImpl.java
0 → 100644
浏览文件 @
21bc7fcc
package
org.dromara.hertzbeat.collector.collect.mq
;
import
com.alibaba.fastjson.JSONObject
;
import
com.google.common.collect.Lists
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.rocketmq.acl.common.AclClientRPCHook
;
import
org.apache.rocketmq.acl.common.SessionCredentials
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.admin.ConsumeStats
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.body.ConsumerConnection
;
import
org.apache.rocketmq.common.protocol.body.KVTable
;
import
org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper
;
import
org.apache.rocketmq.common.protocol.body.TopicList
;
import
org.apache.rocketmq.common.protocol.route.BrokerData
;
import
org.apache.rocketmq.common.utils.ThreadUtils
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.tools.admin.DefaultMQAdminExt
;
import
org.dromara.hertzbeat.collector.collect.AbstractCollect
;
import
org.dromara.hertzbeat.collector.dispatch.DispatchConstants
;
import
org.dromara.hertzbeat.collector.util.JsonPathParser
;
import
org.dromara.hertzbeat.common.constants.CommonConstants
;
import
org.dromara.hertzbeat.common.entity.job.Metrics
;
import
org.dromara.hertzbeat.common.entity.job.protocol.RocketmqProtocol
;
import
org.dromara.hertzbeat.common.entity.message.CollectRep
;
import
org.dromara.hertzbeat.common.util.CommonUtil
;
import
org.jetbrains.annotations.NotNull
;
import
org.springframework.beans.factory.DisposableBean
;
import
org.springframework.util.Assert
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
import
java.util.concurrent.CountDownLatch
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.LinkedBlockingQueue
;
import
java.util.concurrent.ThreadFactory
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.stream.Collectors
;
/**
* rocketmq采集实现类
*
* @author ceilzcx
* @since 5/6/2023
*/
@Slf4j
public
class
RocketmqSingleCollectImpl
extends
AbstractCollect
implements
DisposableBean
{
private
static
final
int
WAIT_TIMEOUT
=
10
;
private
static
final
Set
<
String
>
SYSTEM_GROUP_SET
=
new
HashSet
<>();
private
final
ExecutorService
executorService
;
static
{
// system consumer group
SYSTEM_GROUP_SET
.
add
(
MixAll
.
TOOLS_CONSUMER_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
FILTERSRV_CONSUMER_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
SELF_TEST_CONSUMER_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
ONS_HTTP_PROXY_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
CID_ONSAPI_PULL_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
CID_ONSAPI_PERMISSION_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
CID_ONSAPI_OWNER_GROUP
);
SYSTEM_GROUP_SET
.
add
(
MixAll
.
CID_SYS_RMQ_TRANS
);
}
public
RocketmqSingleCollectImpl
()
{
Runtime
runtime
=
Runtime
.
getRuntime
();
int
corePoolSize
=
Math
.
max
(
8
,
runtime
.
availableProcessors
());
int
maximumPoolSize
=
Math
.
max
(
16
,
runtime
.
availableProcessors
());
ThreadFactory
threadFactory
=
new
ThreadFactory
()
{
private
final
AtomicLong
threadIndex
=
new
AtomicLong
(
0
);
@Override
public
Thread
newThread
(
@NotNull
Runnable
r
)
{
return
new
Thread
(
r
,
"RocketMQCollectGroup_"
+
this
.
threadIndex
.
incrementAndGet
());
}
};
this
.
executorService
=
new
ThreadPoolExecutor
(
corePoolSize
,
maximumPoolSize
,
60L
,
TimeUnit
.
SECONDS
,
new
LinkedBlockingQueue
<>(
5000
),
threadFactory
,
new
ThreadPoolExecutor
.
DiscardOldestPolicy
());
}
@Override
public
void
destroy
()
{
ThreadUtils
.
shutdownGracefully
(
this
.
executorService
,
10L
,
TimeUnit
.
SECONDS
);
}
@Override
public
void
collect
(
CollectRep
.
MetricsData
.
Builder
builder
,
long
appId
,
String
app
,
Metrics
metrics
)
{
try
{
preCheck
(
metrics
);
}
catch
(
Exception
e
)
{
builder
.
setCode
(
CollectRep
.
Code
.
FAIL
);
builder
.
setMsg
(
e
.
getMessage
());
return
;
}
DefaultMQAdminExt
mqAdminExt
=
null
;
try
{
mqAdminExt
=
this
.
createMqAdminExt
(
metrics
);
mqAdminExt
.
start
();
RocketmqCollectData
rocketmqCollectData
=
new
RocketmqCollectData
();
this
.
collectData
(
mqAdminExt
,
rocketmqCollectData
);
this
.
fillBuilder
(
rocketmqCollectData
,
builder
,
metrics
.
getAliasFields
(),
metrics
.
getRocketmq
().
getParseScript
());
}
catch
(
Exception
e
)
{
builder
.
setCode
(
CollectRep
.
Code
.
FAIL
);
String
message
=
CommonUtil
.
getMessageFromThrowable
(
e
);
builder
.
setMsg
(
message
);
}
finally
{
if
(
mqAdminExt
!=
null
)
{
mqAdminExt
.
shutdown
();
}
}
}
@Override
public
String
supportProtocol
()
{
return
DispatchConstants
.
PROTOCOL_ROCKETMQ
;
}
/**
* 采集前置条件, 入参判断
* @param metrics 数据指标
*/
private
void
preCheck
(
Metrics
metrics
)
{
if
(
metrics
==
null
||
metrics
.
getRocketmq
()
==
null
)
{
throw
new
IllegalArgumentException
(
"Mongodb collect must has rocketmq params"
);
}
RocketmqProtocol
rocketmq
=
metrics
.
getRocketmq
();
Assert
.
hasText
(
rocketmq
.
getNamesrvHost
(),
"Rocketmq Protocol namesrvHost is required."
);
Assert
.
hasText
(
rocketmq
.
getNamesrvPort
(),
"Rocketmq Protocol namesrvPort is required."
);
}
/**
* 创建DefaultMQAdminExt实体类; 这里有个小问题, 是否需要每次都重新创建
* @param metrics 数据指标
* @return DefaultMQAdminExt
*/
private
DefaultMQAdminExt
createMqAdminExt
(
Metrics
metrics
)
{
RocketmqProtocol
rocketmqProtocol
=
metrics
.
getRocketmq
();
assert
rocketmqProtocol
!=
null
;
RPCHook
rpcHook
=
null
;
if
(
StringUtils
.
isNotBlank
(
rocketmqProtocol
.
getAccessKey
())
&&
StringUtils
.
isNotBlank
(
rocketmqProtocol
.
getSecretKey
()))
{
rpcHook
=
new
AclClientRPCHook
(
new
SessionCredentials
(
rocketmqProtocol
.
getAccessKey
(),
rocketmqProtocol
.
getSecretKey
()));
}
DefaultMQAdminExt
mqAdminExt
=
new
DefaultMQAdminExt
(
rpcHook
,
5000L
);
mqAdminExt
.
setNamesrvAddr
(
rocketmqProtocol
.
getNamesrvHost
()
+
":"
+
rocketmqProtocol
.
getNamesrvPort
());
mqAdminExt
.
setInstanceName
(
"admin-"
+
System
.
currentTimeMillis
());
return
mqAdminExt
;
}
/**
* 采集rocketmq数据
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
private
void
collectData
(
DefaultMQAdminExt
mqAdminExt
,
RocketmqCollectData
rocketmqCollectData
)
throws
Exception
{
this
.
collectClusterData
(
mqAdminExt
,
rocketmqCollectData
);
this
.
collectConsumerData
(
mqAdminExt
,
rocketmqCollectData
);
this
.
collectTopicData
(
mqAdminExt
,
rocketmqCollectData
);
}
/**
* 采集rocketmq的集群数据
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
private
void
collectClusterData
(
DefaultMQAdminExt
mqAdminExt
,
RocketmqCollectData
rocketmqCollectData
)
throws
Exception
{
try
{
List
<
RocketmqCollectData
.
ClusterBrokerData
>
clusterBrokerDataList
=
new
ArrayList
<>();
rocketmqCollectData
.
setClusterBrokerDataList
(
clusterBrokerDataList
);
ClusterInfo
clusterInfo
=
mqAdminExt
.
examineBrokerClusterInfo
();
for
(
BrokerData
brokerData
:
clusterInfo
.
getBrokerAddrTable
().
values
())
{
for
(
Map
.
Entry
<
Long
,
String
>
entry
:
brokerData
.
getBrokerAddrs
().
entrySet
())
{
RocketmqCollectData
.
ClusterBrokerData
clusterBrokerData
=
new
RocketmqCollectData
.
ClusterBrokerData
();
clusterBrokerDataList
.
add
(
clusterBrokerData
);
clusterBrokerData
.
setBrokerId
(
entry
.
getKey
());
clusterBrokerData
.
setAddress
(
entry
.
getValue
());
KVTable
kvTable
=
mqAdminExt
.
fetchBrokerRuntimeStats
(
entry
.
getValue
());
clusterBrokerData
.
setVersion
(
kvTable
.
getTable
().
get
(
"brokerVersionDesc"
));
String
putTps
=
kvTable
.
getTable
().
get
(
"putTps"
);
if
(
StringUtils
.
isNotEmpty
(
putTps
))
{
String
[]
putTpsArr
=
putTps
.
split
(
" "
);
clusterBrokerData
.
setProducerMessageTps
(
Double
.
parseDouble
(
putTpsArr
[
0
]));
}
String
getTransferredTps
=
kvTable
.
getTable
().
get
(
"getTransferedTps"
);
if
(
StringUtils
.
isNotEmpty
(
getTransferredTps
))
{
String
[]
getTransferredTpsArr
=
getTransferredTps
.
split
(
" "
);
clusterBrokerData
.
setConsumerMessageTps
(
Double
.
parseDouble
(
getTransferredTpsArr
[
0
]));
}
String
msgPutTotalTodayMorning
=
kvTable
.
getTable
().
get
(
"msgPutTotalTodayMorning"
);
String
msgPutTotalYesterdayMorning
=
kvTable
.
getTable
().
get
(
"msgPutTotalYesterdayMorning"
);
if
(
StringUtils
.
isNotEmpty
(
msgPutTotalTodayMorning
)
&&
StringUtils
.
isNotEmpty
(
msgPutTotalYesterdayMorning
))
{
long
yesterdayProduceCount
=
Long
.
parseLong
(
msgPutTotalTodayMorning
)
-
Long
.
parseLong
(
msgPutTotalYesterdayMorning
);
clusterBrokerData
.
setYesterdayProduceCount
(
yesterdayProduceCount
);
}
String
msgGetTotalTodayMorning
=
kvTable
.
getTable
().
get
(
"msgGetTotalTodayMorning"
);
String
msgGetTotalYesterdayMorning
=
kvTable
.
getTable
().
get
(
"msgGetTotalYesterdayMorning"
);
if
(
StringUtils
.
isNotEmpty
(
msgGetTotalTodayMorning
)
&&
StringUtils
.
isNotEmpty
(
msgGetTotalYesterdayMorning
))
{
long
yesterdayConsumerCount
=
Long
.
parseLong
(
msgGetTotalTodayMorning
)
-
Long
.
parseLong
(
msgGetTotalYesterdayMorning
);
clusterBrokerData
.
setYesterdayConsumeCount
(
yesterdayConsumerCount
);
}
String
msgPutTotalTodayNow
=
kvTable
.
getTable
().
get
(
"msgPutTotalTodayNow"
);
if
(
StringUtils
.
isNotEmpty
(
msgPutTotalTodayNow
)
&&
StringUtils
.
isNotEmpty
(
msgPutTotalTodayMorning
))
{
long
todayProduceCount
=
Long
.
parseLong
(
msgPutTotalTodayNow
)
-
Long
.
parseLong
(
msgPutTotalTodayMorning
);
clusterBrokerData
.
setTodayProduceCount
(
todayProduceCount
);
}
String
msgGetTotalTodayNow
=
kvTable
.
getTable
().
get
(
"msgGetTotalTodayNow"
);
if
(
StringUtils
.
isNotEmpty
(
msgGetTotalTodayNow
)
&&
StringUtils
.
isNotEmpty
(
msgGetTotalTodayMorning
))
{
long
todayConsumerCount
=
Long
.
parseLong
(
msgGetTotalTodayNow
)
-
Long
.
parseLong
(
msgGetTotalTodayMorning
);
clusterBrokerData
.
setTodayConsumeCount
(
todayConsumerCount
);
}
}
}
}
catch
(
Exception
e
)
{
log
.
warn
(
"collect rocketmq cluster data error"
,
e
);
throw
e
;
}
}
/**
* 采集rocketmq的消费者数据
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
private
void
collectConsumerData
(
DefaultMQAdminExt
mqAdminExt
,
RocketmqCollectData
rocketmqCollectData
)
throws
Exception
{
Set
<
String
>
consumerGroupSet
=
new
HashSet
<>();
try
{
// 获取consumerGroup集合
ClusterInfo
clusterInfo
=
mqAdminExt
.
examineBrokerClusterInfo
();
for
(
BrokerData
brokerData
:
clusterInfo
.
getBrokerAddrTable
().
values
())
{
SubscriptionGroupWrapper
subscriptionGroupWrapper
=
mqAdminExt
.
getAllSubscriptionGroup
(
brokerData
.
selectBrokerAddr
(),
3000L
);
consumerGroupSet
.
addAll
(
subscriptionGroupWrapper
.
getSubscriptionGroupTable
().
keySet
());
}
List
<
RocketmqCollectData
.
ConsumerInfo
>
consumerInfoList
=
Collections
.
synchronizedList
(
Lists
.
newArrayList
());
rocketmqCollectData
.
setConsumerInfoList
(
consumerInfoList
);
CountDownLatch
countDownLatch
=
new
CountDownLatch
(
consumerGroupSet
.
size
());
for
(
String
consumerGroup
:
consumerGroupSet
)
{
if
(
SYSTEM_GROUP_SET
.
contains
(
consumerGroup
))
{
continue
;
}
executorService
.
submit
(()
->
{
RocketmqCollectData
.
ConsumerInfo
consumerInfo
=
new
RocketmqCollectData
.
ConsumerInfo
();
consumerInfoList
.
add
(
consumerInfo
);
consumerInfo
.
setConsumerGroup
(
consumerGroup
);
try
{
ConsumeStats
consumeStats
=
null
;
try
{
consumeStats
=
mqAdminExt
.
examineConsumeStats
(
consumerGroup
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"examineConsumeStats exception to consumerGroup {}, response [{}]"
,
consumerGroup
,
e
.
getMessage
());
}
if
(
consumeStats
!=
null
)
{
consumerInfo
.
setConsumeTps
(
consumeStats
.
getConsumeTps
());
consumerInfo
.
setDiffTotal
(
consumeStats
.
computeTotalDiff
());
}
ConsumerConnection
consumerConnection
=
null
;
try
{
consumerConnection
=
mqAdminExt
.
examineConsumerConnectionInfo
(
consumerGroup
);
}
catch
(
Exception
e
)
{
log
.
warn
(
"examineConsumeStats exception to consumerGroup {}, response [{}]"
,
consumerGroup
,
e
.
getMessage
());
}
if
(
consumerConnection
!=
null
)
{
consumerInfo
.
setClientQuantity
(
consumerConnection
.
getConnectionSet
().
size
());
consumerInfo
.
setMessageModel
(
consumerConnection
.
getMessageModel
().
getModeCN
());
consumerInfo
.
setConsumeType
(
consumerConnection
.
getConsumeType
().
getTypeCN
());
}
}
catch
(
Exception
e
)
{
log
.
warn
(
"examineConsumeStats or examineConsumerConnectionInfo error, {}"
,
consumerGroup
,
e
);
}
finally
{
countDownLatch
.
countDown
();
}
});
}
if
(!
countDownLatch
.
await
(
WAIT_TIMEOUT
,
TimeUnit
.
SECONDS
))
{
log
.
warn
(
"examineConsumeStats or examineConsumerConnectionInfo timeout"
);
}
}
catch
(
Exception
e
)
{
log
.
warn
(
"collect rocketmq consume data error"
,
e
);
throw
e
;
}
}
/**
*
* @param mqAdminExt rocketmq提供的远程调用类
* @param rocketmqCollectData rocketmq数据采集类
* @throws Exception 远程调用异常
*/
private
void
collectTopicData
(
DefaultMQAdminExt
mqAdminExt
,
RocketmqCollectData
rocketmqCollectData
)
throws
Exception
{
try
{
TopicList
topicList
=
mqAdminExt
.
fetchAllTopicList
();
Set
<
String
>
topics
=
topicList
.
getTopicList
()
.
stream
()
.
filter
(
topic
->
!(
topic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
)
||
topic
.
startsWith
(
MixAll
.
DLQ_GROUP_TOPIC_PREFIX
)))
.
collect
(
Collectors
.
toSet
());
List
<
Map
<
String
/* topic */
,
List
<
RocketmqCollectData
.
TopicQueueInfo
>>>
topicInfoList
=
new
ArrayList
<>();
for
(
String
topic
:
topics
)
{
Map
<
String
,
List
<
RocketmqCollectData
.
TopicQueueInfo
>>
topicQueueInfoTable
=
new
HashMap
<>(
32
);
List
<
RocketmqCollectData
.
TopicQueueInfo
>
topicQueueInfoList
=
new
ArrayList
<>();
// todo 查询topic的queue信息需要for循环调用 mqAdminExt.examineTopicStats(), topic数量很大的情况, 调用次数也会很多
topicQueueInfoTable
.
put
(
topic
,
topicQueueInfoList
);
topicInfoList
.
add
(
topicQueueInfoTable
);
rocketmqCollectData
.
setTopicInfoList
(
topicInfoList
);
}
}
catch
(
Exception
e
)
{
log
.
warn
(
"collect rocketmq topic data error"
,
e
);
throw
e
;
}
}
/**
* 采集数据填充到builder
* @param rocketmqCollectData rocketmq数据采集类
* @param builder metrics data builder
* @param aliasFields 字段别名
* @param parseScript JSON的base path
*/
private
void
fillBuilder
(
RocketmqCollectData
rocketmqCollectData
,
CollectRep
.
MetricsData
.
Builder
builder
,
List
<
String
>
aliasFields
,
String
parseScript
)
{
String
dataJson
=
JSONObject
.
toJSONString
(
rocketmqCollectData
);
List
<
Object
>
results
=
JsonPathParser
.
parseContentWithJsonPath
(
dataJson
,
parseScript
);
for
(
int
i
=
0
;
i
<
results
.
size
();
i
++)
{
CollectRep
.
ValueRow
.
Builder
valueRowBuilder
=
CollectRep
.
ValueRow
.
newBuilder
();
for
(
String
aliasField
:
aliasFields
)
{
List
<
Object
>
valueList
=
JsonPathParser
.
parseContentWithJsonPath
(
dataJson
,
parseScript
+
aliasField
);
if
(
CollectionUtils
.
isNotEmpty
(
valueList
)
&&
valueList
.
size
()
>
i
)
{
Object
value
=
valueList
.
get
(
i
);
valueRowBuilder
.
addColumns
(
value
==
null
?
CommonConstants
.
NULL_VALUE
:
String
.
valueOf
(
value
));
}
else
{
valueRowBuilder
.
addColumns
(
CommonConstants
.
NULL_VALUE
);
}
}
builder
.
addValues
(
valueRowBuilder
.
build
());
}
}
}
collector/src/main/java/org/dromara/hertzbeat/collector/dispatch/DispatchConstants.java
浏览文件 @
21bc7fcc
...
...
@@ -75,6 +75,10 @@ public interface DispatchConstants {
* protocol ssl Certificate - custom
*/
String
PROTOCOL_SSL_CERT
=
"ssl_cert"
;
/**
* protocol rocketmq
*/
String
PROTOCOL_ROCKETMQ
=
"rocketmq"
;
// Protocol type related - end
// 协议类型相关 - end //
...
...
collector/src/main/resources/META-INF/services/org.dromara.hertzbeat.collector.collect.AbstractCollect
浏览文件 @
21bc7fcc
...
...
@@ -9,3 +9,4 @@ org.dromara.hertzbeat.collector.collect.snmp.SnmpCollectImpl
org.dromara.hertzbeat.collector.collect.ssh.SshCollectImpl
org.dromara.hertzbeat.collector.collect.telnet.TelnetCollectImpl
org.dromara.hertzbeat.collector.collect.ftp.FtpCollectImpl
org.dromara.hertzbeat.collector.collect.mq.RocketmqSingleCollectImpl
\ No newline at end of file
common/src/main/java/org/dromara/hertzbeat/common/entity/job/Metrics.java
浏览文件 @
21bc7fcc
...
...
@@ -151,6 +151,10 @@ public class Metrics {
* 使用公共的ftp协议的监控配置信息
*/
private
FtpProtocol
ftp
;
/**
* Monitoring configuration information using the public rocketmq protocol 使用公共的rocketmq协议的监控配置信息
*/
private
RocketmqProtocol
rocketmq
;
/**
* collector use - Temporarily store subTask indicator group response data
...
...
common/src/main/java/org/dromara/hertzbeat/common/entity/job/protocol/RocketmqProtocol.java
0 → 100644
浏览文件 @
21bc7fcc
package
org.dromara.hertzbeat.common.entity.job.protocol
;
import
lombok.AllArgsConstructor
;
import
lombok.Builder
;
import
lombok.Data
;
import
lombok.NoArgsConstructor
;
/**
* rocketmq protocol
*
* @author ceilzcx
* @since 5/6/2023
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public
class
RocketmqProtocol
{
/**
* rocketmq namesrv host
*/
private
String
namesrvHost
;
/**
* rocketmq namesrv port
*/
private
String
namesrvPort
;
/**
* accessKey
*/
private
String
accessKey
;
/**
* secretKey
*/
private
String
secretKey
;
/**
* jsonpath解析脚本
*/
private
String
parseScript
;
}
manager/src/main/resources/define/app-rocketmq.yml
0 → 100644
浏览文件 @
21bc7fcc
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
category
:
mid
app
:
rocketmq
name
:
zh-CN
:
RocketMQ
en-US
:
RocketMQ
params
:
# field-param field key
# field-字段名称标识符
-
field
:
host
# name-param field display i18n name
# name-参数字段显示名称
name
:
zh-CN
:
注册中心Host
en-US
:
Namesrv Host
# type-param field type(most mapping the html input type)
# type-字段类型,样式(大部分映射input标签type属性)
type
:
host
# required-true or false
# 是否是必输项 true-必填 false-可选
required
:
true
-
field
:
port
name
:
zh-CN
:
端口
en-US
:
Port
type
:
number
# when type is number, range is required
# 当type为number时,用range表示范围
range
:
'
[0,65535]'
required
:
true
defaultValue
:
9876
-
field
:
accessKey
name
:
zh-CN
:
accessKey
en-US
:
accessKey
type
:
text
-
field
:
secretKey
name
:
zh-CN
:
secretKey
en-US
:
secretKey
type
:
text
metrics
:
-
name
:
cluster
priority
:
0
fields
:
-
field
:
BrokerId
type
:
1
-
field
:
Address
type
:
1
instance
:
true
-
field
:
Version
type
:
1
-
field
:
Producer_Message_TPS
type
:
0
-
field
:
Consumer_Message_TPS
type
:
0
-
field
:
Yesterday_Produce_Count
type
:
0
-
field
:
Yesterday_Consume_Count
type
:
0
-
field
:
Today_Produce_Count
type
:
0
-
field
:
Today_Consume_Count
type
:
0
aliasFields
:
-
brokerId
-
address
-
version
-
producerMessageTps
-
consumerMessageTps
-
yesterdayProduceCount
-
yesterdayConsumeCount
-
todayProduceCount
-
todayConsumeCount
calculates
:
-
BrokerId=brokerId
-
Address=address
-
Version=version
-
Producer_Message_TPS=producerMessageTps
-
Consumer_Message_TPS=consumerMessageTps
-
Yesterday_Produce_Count=yesterdayProduceCount
-
Yesterday_Consume_Count=yesterdayConsumeCount
-
Today_Produce_Count=todayProduceCount
-
Today_Consume_Count=todayConsumeCount
# 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk, rocketmq
protocol
:
rocketmq
# 当protocol为http协议时具体的采集配置
rocketmq
:
namesrvHost
:
^_^host^_^
namesrvPort
:
^_^port^_^
accessKey
:
^_^accessKey^_^
secretKey
:
^_^secretKey^_^
parseScript
:
$.clusterBrokerDataList.*
-
name
:
consumer
priority
:
1
fields
:
-
field
:
Consumer_group
type
:
1
instance
:
true
-
field
:
Client_quantity
type
:
0
-
field
:
Message_model
type
:
1
-
field
:
Consume_type
type
:
1
-
field
:
Consume_tps
type
:
0
-
field
:
Delay
type
:
0
aliasFields
:
-
consumerGroup
-
clientQuantity
-
messageModel
-
consumeType
-
consumeTps
-
diffTotal
calculates
:
-
Consumer_group=consumerGroup
-
Client_quantity=clientQuantity
-
Message_model=messageModel
-
Consume_type=consumeType
-
Consume_tps=consumeTps
-
Delay=diffTotal
# 监控采集使用协议 eg: sql, ssh, http, telnet, wmi, snmp, sdk, rocketmq
protocol
:
rocketmq
# 当protocol为http协议时具体的采集配置
rocketmq
:
namesrvHost
:
^_^host^_^
namesrvPort
:
^_^port^_^
parseScript
:
$.consumerInfoList.*
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录