diff --git a/cmd/roles/roles.go b/cmd/roles/roles.go index 9d12cf0040187f31cdeb73f9eb2ea6d86ba768ab..90e1b671c231bda40c243387cb8cf0a0f4daf3cb 100644 --- a/cmd/roles/roles.go +++ b/cmd/roles/roles.go @@ -54,6 +54,7 @@ func init() { Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{})) Registry.MustRegister(prometheus.NewGoCollector()) metrics.RegisterEtcdMetrics(Registry) + metrics.RegisterMq(Registry) } func stopRocksmq() { diff --git a/internal/datanode/data_sync_service.go b/internal/datanode/data_sync_service.go index 5d0c420088403d8b22926bf0fe957f1f853b166d..9ed55f73485fe4fdc98216683763ba9a4901ad65 100644 --- a/internal/datanode/data_sync_service.go +++ b/internal/datanode/data_sync_service.go @@ -28,7 +28,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgdispatcher" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" @@ -165,8 +164,6 @@ func (dsService *dataSyncService) close() { zap.String("vChanName", dsService.vchannelName)) dsService.dispClient.Deregister(dsService.vchannelName) dsService.fg.Close() - metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() - metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel } dsService.clearGlobalFlushingCache() diff --git a/internal/datanode/flow_graph_dmstream_input_node.go b/internal/datanode/flow_graph_dmstream_input_node.go index 75149c5f0564a94c03e2c78055e9a3190de1f77f..0473d45a993efbf952287d0afe236cc60e8f063c 100644 --- a/internal/datanode/flow_graph_dmstream_input_node.go +++ b/internal/datanode/flow_graph_dmstream_input_node.go @@ -60,7 +60,6 @@ func newDmInputNode(dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPos } log.Info("datanode consume successfully when register to msgDispatcher") } - metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName) node := flowgraph.NewInputNode(input, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism, diff --git a/internal/metrics/datanode_metrics.go b/internal/metrics/datanode_metrics.go index 0e351d2ed757b39cffcbe5b1175c5c13aa9de1f6..82b531f2b326e00cb9cea9055b5738f4bbfb3f9a 100644 --- a/internal/metrics/datanode_metrics.go +++ b/internal/metrics/datanode_metrics.go @@ -56,16 +56,6 @@ var ( msgTypeLabelName, }) - DataNodeNumConsumers = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.DataNodeRole, - Name: "consumer_num", - Help: "number of consumers", - }, []string{ - nodeIDLabelName, - }) - DataNodeNumProducers = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -215,7 +205,6 @@ func RegisterDataNode(registry *prometheus.Registry) { registry.MustRegister(DataNodeNumFlowGraphs) registry.MustRegister(DataNodeConsumeMsgRowsCount) registry.MustRegister(DataNodeFlushedSize) - registry.MustRegister(DataNodeNumConsumers) registry.MustRegister(DataNodeNumProducers) registry.MustRegister(DataNodeConsumeTimeTickLag) registry.MustRegister(DataNodeEncodeBufferLatency) diff --git a/internal/metrics/metrics_test.go b/internal/metrics/metrics_test.go index da9de6858780734d92aa7e8c4c8ec24af54e74ca..d321c925864ee5b3eabc82c778cce3fd4d93d473 100644 --- a/internal/metrics/metrics_test.go +++ b/internal/metrics/metrics_test.go @@ -33,5 +33,6 @@ func TestRegisterMetrics(t *testing.T) { RegisterQueryNode(r) RegisterQueryCoord(r) RegisterEtcdMetrics(r) + RegisterMq(r) Register(r) } diff --git a/internal/metrics/mq_metrics.go b/internal/metrics/mq_metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..b63f78333d8068f85529b6042631d701dda95959 --- /dev/null +++ b/internal/metrics/mq_metrics.go @@ -0,0 +1,38 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + NumConsumers = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: "msg_queue", + Name: "consumer_num", + Help: "number of consumers", + }, []string{ + roleNameLabelName, + nodeIDLabelName, + }) +) + +func RegisterMq(registry *prometheus.Registry) { + registry.MustRegister(NumConsumers) +} diff --git a/internal/metrics/querynode_metrics.go b/internal/metrics/querynode_metrics.go index e8b403cc7fc88479e91455568178a6b4e246ca29..d6cc60f9a6cdde632c12dd20bd2149d10be7552e 100644 --- a/internal/metrics/querynode_metrics.go +++ b/internal/metrics/querynode_metrics.go @@ -102,16 +102,6 @@ var ( nodeIDLabelName, }) - QueryNodeNumConsumers = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.QueryNodeRole, - Name: "consumer_num", - Help: "number of consumers", - }, []string{ - nodeIDLabelName, - }) - QueryNodeSQCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, @@ -363,7 +353,6 @@ func RegisterQueryNode(registry *prometheus.Registry) { registry.MustRegister(QueryNodeNumSegments) registry.MustRegister(QueryNodeNumDmlChannels) registry.MustRegister(QueryNodeNumDeltaChannels) - registry.MustRegister(QueryNodeNumConsumers) registry.MustRegister(QueryNodeSQCount) registry.MustRegister(QueryNodeSQReqLatency) registry.MustRegister(QueryNodeSQLatencyInQueue) diff --git a/internal/mq/msgdispatcher/dispatcher.go b/internal/mq/msgdispatcher/dispatcher.go index d6be51b404fb4a699404d68afe19096d7da62de0..4ce5598fc3ab7d526e0587e643b9f9c7614cfba9 100644 --- a/internal/mq/msgdispatcher/dispatcher.go +++ b/internal/mq/msgdispatcher/dispatcher.go @@ -27,9 +27,11 @@ import ( "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/util/funcutil" + "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -117,6 +119,8 @@ func NewDispatcher(factory msgstream.Factory, targets: make(map[string]*target), stream: stream, } + + metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc() return d, nil } @@ -178,6 +182,7 @@ func (d *Dispatcher) Handle(signal signal) { d.cancel() d.wg.Wait() d.once.Do(func() { + metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Dec() d.stream.Close() }) } diff --git a/internal/querynode/flow_graph_query_node.go b/internal/querynode/flow_graph_query_node.go index bae8a7e22de88f00d8b446cf618f09fbb5d0b53b..534dc9b13c4b729dcd52f3e2f2b6715561345518 100644 --- a/internal/querynode/flow_graph_query_node.go +++ b/internal/querynode/flow_graph_query_node.go @@ -224,9 +224,6 @@ func (q *queryNodeFlowGraph) newDmInputNode(collectionID UniqueID, vchannel Chan func (q *queryNodeFlowGraph) close() { q.dispClient.Deregister(q.vchannel) q.flowGraph.Close() - if q.consumerCnt > 0 { - metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(float64(q.consumerCnt)) - } log.Info("stop query node flow graph", zap.Int64("collectionID", q.collectionID), zap.String("vchannel", q.vchannel), diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index e56dc74ef0e41b5873ff7b058865946feaef6459..95dc24bbbe6745ad64e76970c43e1c1fec29632e 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -718,7 +718,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection } defer func() { - metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() + metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Dec() stream.Close() }() @@ -748,7 +748,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection return nil } - metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc() + metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc() err = stream.Seek([]*msgpb.MsgPosition{position}) if err != nil { return err