未验证 提交 1e6185ad 编写于 作者: 2 23931017wu 提交者: GitHub

[IOTDB-4038] Add the leader metrics to the cluster (#6923)

上级 2aefcb31
......@@ -56,11 +56,6 @@ import org.apache.iotdb.confignode.manager.load.heartbeat.DataNodeHeartbeatCache
import org.apache.iotdb.confignode.manager.load.heartbeat.INodeCache;
import org.apache.iotdb.confignode.manager.load.heartbeat.IRegionGroupCache;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionRouteReq;
......@@ -106,6 +101,7 @@ public class LoadManager {
private final PartitionBalancer partitionBalancer;
private final RouteBalancer routeBalancer;
private final LoadManagerMetrics loadManagerMetrics;
/** Heartbeat executor service */
private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
......@@ -131,6 +127,7 @@ public class LoadManager {
this.regionBalancer = new RegionBalancer(configManager);
this.partitionBalancer = new PartitionBalancer(configManager);
this.routeBalancer = new RouteBalancer(configManager);
this.loadManagerMetrics = new LoadManagerMetrics(configManager);
}
/**
......@@ -270,12 +267,13 @@ public class LoadManager {
TimeUnit.MILLISECONDS);
LOGGER.info("LoadBalancing service is started successfully.");
}
loadManagerMetrics.addMetrics();
}
}
/** Stop the heartbeat service and the load balancing service */
public void stop() {
removeMetrics();
loadManagerMetrics.removeMetrics();
LOGGER.debug("Stop Heartbeat Service and LoadBalancing Service of LoadManager");
synchronized (scheduleMonitor) {
if (currentHeartbeatFuture != null) {
......@@ -344,9 +342,6 @@ public class LoadManager {
if (isNeedBroadcast) {
broadcastLatestRegionRouteMap();
}
if (nodeCacheMap.size() == getNodeManager().getRegisteredNodeCount()) {
addMetrics();
}
}
public void broadcastLatestRegionRouteMap() {
......@@ -464,7 +459,7 @@ public class LoadManager {
registeredConfigNode -> {
int configNodeId = registeredConfigNode.getConfigNodeId();
return nodeCacheMap.containsKey(configNodeId)
&& nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Running);
&& NodeStatus.Running.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
})
.collect(Collectors.toList());
}
......@@ -475,7 +470,7 @@ public class LoadManager {
registeredDataNode -> {
int id = registeredDataNode.getLocation().getDataNodeId();
return nodeCacheMap.containsKey(id)
&& nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Running);
&& NodeStatus.Running.equals(nodeCacheMap.get(id).getNodeStatus());
})
.collect(Collectors.toList());
}
......@@ -486,7 +481,7 @@ public class LoadManager {
registeredConfigNode -> {
int configNodeId = registeredConfigNode.getConfigNodeId();
return nodeCacheMap.containsKey(configNodeId)
&& nodeCacheMap.get(configNodeId).getNodeStatus().equals(NodeStatus.Unknown);
&& NodeStatus.Unknown.equals(nodeCacheMap.get(configNodeId).getNodeStatus());
})
.collect(Collectors.toList());
}
......@@ -497,197 +492,11 @@ public class LoadManager {
registeredDataNode -> {
int id = registeredDataNode.getLocation().getDataNodeId();
return nodeCacheMap.containsKey(id)
&& nodeCacheMap.get(id).getNodeStatus().equals(NodeStatus.Unknown);
&& NodeStatus.Unknown.equals(nodeCacheMap.get(id).getNodeStatus());
})
.collect(Collectors.toList());
}
public int getRunningConfigNodesNum() {
List<TConfigNodeLocation> allConfigNodes = getOnlineConfigNodes();
if (allConfigNodes == null) {
return 0;
}
for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
String name =
"EndPoint("
+ configNodeLocation.getInternalEndPoint().ip
+ ":"
+ configNodeLocation.getInternalEndPoint().port
+ ")";
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"ConfigNode")
.set(1);
}
return allConfigNodes.size();
}
public int getRunningDataNodesNum() {
List<TDataNodeConfiguration> allDataNodes = getOnlineDataNodes();
if (allDataNodes == null) {
return 0;
}
for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name =
"EndPoint("
+ dataNodeLocation.getClientRpcEndPoint().ip
+ ":"
+ dataNodeLocation.getClientRpcEndPoint().port
+ ")";
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"DataNode")
.set(1);
}
return allDataNodes.size();
}
public int getUnknownConfigNodesNum() {
List<TConfigNodeLocation> allConfigNodes = getUnknownConfigNodes();
if (allConfigNodes == null) {
return 0;
}
for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
String name =
"EndPoint("
+ configNodeLocation.getInternalEndPoint().ip
+ ":"
+ configNodeLocation.getInternalEndPoint().port
+ ")";
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"ConfigNode")
.set(0);
}
return allConfigNodes.size();
}
public int getUnknownDataNodesNum() {
List<TDataNodeConfiguration> allDataNodes = getUnknownDataNodes();
if (allDataNodes == null) {
return 0;
}
for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name =
"EndPoint("
+ dataNodeLocation.getClientRpcEndPoint().ip
+ ":"
+ dataNodeLocation.getClientRpcEndPoint().port
+ ")";
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"DataNode")
.set(0);
}
return allDataNodes.size();
}
public void addMetrics() {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString())
.set(getRunningConfigNodesNum());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString())
.set(getRunningDataNodesNum());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString())
.set(getUnknownConfigNodesNum());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString())
.set(getUnknownDataNodesNum());
}
}
public void removeMetrics() {
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.CONFIG_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.DATA_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.CONFIG_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.DATA_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
}
public static void printRegionRouteMap(
long timestamp, Map<TConsensusGroupId, TRegionReplicaSet> regionRouteMap) {
LOGGER.info("[latestRegionRouteMap] timestamp:{}", timestamp);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.confignode.manager.load;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.NodeManager;
import org.apache.iotdb.db.service.metrics.MetricsService;
import org.apache.iotdb.db.service.metrics.enums.Metric;
import org.apache.iotdb.db.service.metrics.enums.Tag;
import org.apache.iotdb.metrics.config.MetricConfigDescriptor;
import org.apache.iotdb.metrics.utils.MetricLevel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** This class collates metrics about loadManager */
public class LoadManagerMetrics {
private final IManager configManager;
public LoadManagerMetrics(IManager configManager) {
this.configManager = configManager;
}
public void addMetrics() {
addNodeMetrics();
addLeaderCount();
}
private int getRunningConfigNodesNum() {
List<TConfigNodeLocation> allConfigNodes =
configManager.getLoadManager().getOnlineConfigNodes();
if (allConfigNodes == null) {
return 0;
}
for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"ConfigNode")
.set(1);
}
return allConfigNodes.size();
}
private int getRunningDataNodesNum() {
List<TDataNodeConfiguration> allDataNodes = configManager.getLoadManager().getOnlineDataNodes();
if (allDataNodes == null) {
return 0;
}
for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"DataNode")
.set(1);
}
return allDataNodes.size();
}
private int getUnknownConfigNodesNum() {
List<TConfigNodeLocation> allConfigNodes =
configManager.getLoadManager().getUnknownConfigNodes();
if (allConfigNodes == null) {
return 0;
}
for (TConfigNodeLocation configNodeLocation : allConfigNodes) {
String name = NodeUrlUtils.convertTEndPointUrl(configNodeLocation.getInternalEndPoint());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"ConfigNode")
.set(0);
}
return allConfigNodes.size();
}
private int getUnknownDataNodesNum() {
List<TDataNodeConfiguration> allDataNodes =
configManager.getLoadManager().getUnknownDataNodes();
if (allDataNodes == null) {
return 0;
}
for (TDataNodeConfiguration dataNodeInfo : allDataNodes) {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
String name = NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateGauge(
Metric.CLUSTER_NODE_STATUS.toString(),
MetricLevel.IMPORTANT,
Tag.NAME.toString(),
name,
Tag.TYPE.toString(),
"DataNode")
.set(0);
}
return allDataNodes.size();
}
public void addNodeMetrics() {
if (MetricConfigDescriptor.getInstance().getMetricConfig().getEnableMetric()) {
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
this,
o -> getRunningConfigNodesNum(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
this,
o -> getRunningDataNodesNum(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.CONFIG_NODE.toString(),
MetricLevel.CORE,
this,
o -> getUnknownConfigNodesNum(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.DATA_NODE.toString(),
MetricLevel.CORE,
this,
o -> getUnknownDataNodesNum(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
}
}
/**
* Get the LeaderCount of Specific DataNodeId
*
* @return Integer
*/
public Integer getLeadershipCountByDatanode(int dataNodeId) {
Map<Integer, Integer> idToCountMap = new ConcurrentHashMap<>();
configManager
.getLoadManager()
.getAllLeadership()
.forEach((consensusGroupId, nodeId) -> idToCountMap.merge(nodeId, 1, Integer::sum));
return idToCountMap.get(dataNodeId);
}
public void addLeaderCount() {
getNodeManager()
.getRegisteredDataNodes()
.forEach(
dataNodeInfo -> {
TDataNodeLocation dataNodeLocation = dataNodeInfo.getLocation();
int dataNodeId = dataNodeLocation.getDataNodeId();
String name =
NodeUrlUtils.convertTEndPointUrl(dataNodeLocation.getClientRpcEndPoint());
MetricsService.getInstance()
.getMetricManager()
.getOrCreateAutoGauge(
Metric.CLUSTER_NODE_LEADER_COUNT.toString(),
MetricLevel.IMPORTANT,
this,
o -> getLeadershipCountByDatanode(dataNodeId),
Tag.NAME.toString(),
name);
});
}
public void removeMetrics() {
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.CONFIG_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.DATA_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Online.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.CONFIG_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
MetricsService.getInstance()
.getMetricManager()
.removeGauge(
Metric.DATA_NODE.toString(),
Tag.NAME.toString(),
"total",
Tag.STATUS.toString(),
NodeStatus.Unknown.toString());
}
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
}
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
......@@ -28,4 +28,4 @@ schema_region_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisCon
system_dir=target/confignode3/system
data_dirs=target/confignode3/data
consensus_dir=target/confignode3/consensus
proc_wal_dir=target/confignode3/proc
\ No newline at end of file
proc_wal_dir=target/confignode3/proc
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
......@@ -122,9 +122,9 @@ Next, we will choose Prometheus format data as samples to describe each kind of
| Metric | Tag | level | Description | Sample |
| ------------------------- | ------------------------------------------------------------------ | --------- | -------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------- |
| cluster_node_leader_count | name="{{ip}}" | important | The count of ```dataGroupLeader``` on each node, which reflects the distribution of leaders | cluster_node_leader_count{name="127.0.0.1",} 2.0 |
| cluster_node_leader_count | name="{{ip}}:{{port}}" | important | The count of ```dataGroupLeader``` on each node, which reflects the distribution of leaders | cluster_node_leader_count{name="127.0.0.1",} 2.0 |
| cluster_uncommitted_log | name="{{ip_datagroupHeader}}" | important | The count of ```uncommitted_log``` on each node in data groups it belongs to | cluster_uncommitted_log{name="127.0.0.1_Data-127.0.0.1-40010-raftId-0",} 0.0 |
| cluster_node_status | name="{{ip}}:{{port}}",type="ConfigNode/DataNode" | important | The current node status, 0=Unkonwn 1=online | cluster_node_status{name="EndPoint(0.0.0.0:22277)",type="ConfigNode",} 1.0 |
| cluster_node_status | name="{{ip}}:{{port}}",type="ConfigNode/DataNode" | important | The current node status, 0=Unkonwn 1=online | cluster_node_status{name="0.0.0.0:22277",type="ConfigNode",} 1.0 |
| cluster_elect_total | name="{{ip}}",status="fail/win" | important | The count and result (won or failed) of elections the node participated in. | cluster_elect_total{name="127.0.0.1",status="win",} 1.0 |
| config_node | name="total",status="Registered/Online/Unknown" | core | The number of registered/online/offline confignodes | config_node{name="total",status="Online",} 3.0 |
| data_node | name="total",status="Registered/Online/Unknown" | core | The number of registered/online/offline datanodes | data_node{name="total",status="Registered",} 3.0 |
......
......@@ -120,9 +120,9 @@ IoTDB对外提供JMX和Prometheus格式的监控指标,对于JMX,可以通
| Metric | Tag | level | 说明 | 示例 |
| ------------------------- | ------------------------------------------------------------------ | --------- | ------------------------------------------------------------- | ---------------------------------------------------------------------------- |
| cluster_node_leader_count | name="{{ip}}" | important | 节点上```dataGroupLeader```的数量,用来观察leader是否分布均匀 | cluster_node_leader_count{name="127.0.0.1",} 2.0 |
| cluster_node_leader_count | name="{{ip}}:{{port}}" | important | 节点上```dataGroupLeader```的数量,用来观察leader是否分布均匀 | cluster_node_leader_count{name="127.0.0.1",} 2.0 |
| cluster_uncommitted_log | name="{{ip_datagroupHeader}}" | important | 节点```uncommitted_log```的数量 | cluster_uncommitted_log{name="127.0.0.1_Data-127.0.0.1-40010-raftId-0",} 0.0 |
| cluster_node_status | name="{{ip}}:{{port}}",type="ConfigNode/DataNode" | important | 节点状态,0=Unkonwn 1=online | cluster_node_status{name="EndPoint(0.0.0.0:22277)",type="ConfigNode",} 1.0 |
| cluster_node_status | name="{{ip}}:{{port}}",type="ConfigNode/DataNode" | important | 节点状态,0=Unkonwn 1=online | cluster_node_status{name="0.0.0.0:22277",type="ConfigNode",} 1.0 |
| cluster_elect_total | name="{{ip}}",status="fail/win" | important | 节点参与选举的次数及结果 | cluster_elect_total{name="127.0.0.1",status="win",} 1.0 |
| config_node | name="total",status="Registered/Online/Unknown" | core | 已注册/在线/离线 confignode 的节点数量 | config_node{name="total",status="Online",} 2.0 |
| data_node | name="total",status="Registered/Online/Unknown" | core | 已注册/在线/离线 datanode 的节点数量 | data_node{name="total",status="Registered",} 3.0 |
......
......@@ -36,4 +36,4 @@ udf_root_dir=target/datanode1/ext
tracing_dir=target/datanode1/data/tracing
consensus_dir=target/datanode1/consensus
sync_dir=target/datanode1/sync
timestamp_precision=ms
\ No newline at end of file
timestamp_precision=ms
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
......@@ -35,5 +35,6 @@ index_root_dir=target/datanode2/data/index
udf_root_dir=target/datanode2/ext
tracing_dir=target/datanode2/data/tracing
consensus_dir=target/datanode2/consensus
timestamp_precision=ms
sync_dir=target/datanode2/sync
timestamp_precision=ms
\ No newline at end of file
timestamp_precision=ms
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
......@@ -50,4 +50,4 @@ ioTDBReporterConfig:
password: root
maxConnectionNumber: 3
database: _metric
pushPeriodInSecond: 15
\ No newline at end of file
pushPeriodInSecond: 15
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册