From 688a587f4f47ccb4b49f40403e5bcbff5dbbfa24 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 21 Jun 2021 18:08:15 +0800 Subject: [PATCH] Add metrics in datanode (#5926) Signed-off-by: yangxuan --- internal/datanode/data_node.go | 15 +++++++++++++-- internal/metrics/metrics.go | 24 +++++++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 4aa1e901f..e7d4e7b8a 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -28,6 +28,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/msgstream" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -42,6 +43,12 @@ import ( const ( RPCConnectionTimeout = 30 * time.Second + + // MetricRequestsTotal used to count the num of total requests + MetricRequestsTotal = "total" + + // MetricRequestsSuccess used to count the num of successful requests + MetricRequestsSuccess = "success" ) // DataNode struct communicates with outside services and unioun all @@ -246,6 +253,7 @@ func (node *DataNode) UpdateStateCode(code internalpb.StateCode) { // WatchDmChannels create a new dataSyncService for every unique dmlVchannel name, ignore if dmlVchannel existed. func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { + metrics.DataNodeWatchDmChannelsCounter.WithLabelValues(MetricRequestsTotal).Inc() status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, } @@ -253,11 +261,11 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha switch { case node.State.Load() != internalpb.StateCode_Healthy: status.Reason = fmt.Sprintf("DataNode %d not healthy, please re-send message", node.NodeID) - return status, errors.New(status.GetReason()) + return status, nil case len(in.GetVchannels()) == 0: status.Reason = "Illegal request" - return status, errors.New(status.GetReason()) + return status, nil default: for _, chanInfo := range in.GetVchannels() { @@ -272,6 +280,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha status.ErrorCode = commonpb.ErrorCode_Success log.Debug("DataNode WatchDmChannels Done") + metrics.DataNodeWatchDmChannelsCounter.WithLabelValues(MetricRequestsSuccess).Inc() return status, nil } } @@ -345,6 +354,7 @@ func (node *DataNode) ReadyToFlush() error { // // There are 1 precondition: The segmentID in req is in ascending order. func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) { + metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsTotal).Inc() status := &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, } @@ -411,6 +421,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen log.Debug("FlushSegments Done") status.ErrorCode = commonpb.ErrorCode_Success + metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsSuccess).Inc() return status, nil } diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 862ac6dec..0fd431b64 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -14,6 +14,7 @@ const ( milvusNamespace = "milvus" subSystemRootCoord = "rootcoord" subSystemDataCoord = "dataCoord" + subSystemDataNode = "dataNode" ) /* @@ -247,9 +248,30 @@ func RegisterDataCoord() { prometheus.Register(DataCoordDataNodeList) } +var ( + // DataNodeFlushSegmentsCounter used to count the num of calls of FlushSegments + DataNodeFlushSegmentsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: subSystemDataNode, + Name: "flush_segments_total", + Help: "Counter of flush segments", + }, []string{"type"}) + + // DataNodeWatchDmChannelCounter used to count the num of calls of WatchDmChannels + DataNodeWatchDmChannelsCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: milvusNamespace, + Subsystem: subSystemDataNode, + Name: "watch_dm_channels_total", + Help: "Counter of watch dm channel", + }, []string{"type"}) +) + //RegisterDataNode register DataNode metrics func RegisterDataNode() { - + prometheus.Register(DataNodeFlushSegmentsCounter) + prometheus.Register(DataNodeWatchDmChannelsCounter) } //RegisterIndexCoord register IndexCoord metrics -- GitLab