未验证 提交 1a485044 编写于 作者: A aoiasd 提交者: GitHub

fix NumConsumers metric not work (#23167)

Signed-off-by: Naoiasd <zhicheng.yue@zilliz.com>
上级 424570b2
......@@ -54,6 +54,7 @@ func init() {
Registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
Registry.MustRegister(prometheus.NewGoCollector())
metrics.RegisterEtcdMetrics(Registry)
metrics.RegisterMq(Registry)
}
func stopRocksmq() {
......
......@@ -28,7 +28,6 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/datanode/allocator"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgdispatcher"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
......@@ -165,8 +164,6 @@ func (dsService *dataSyncService) close() {
zap.String("vChanName", dsService.vchannelName))
dsService.dispClient.Deregister(dsService.vchannelName)
dsService.fg.Close()
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(2) // timeTickChannel + deltaChannel
}
dsService.clearGlobalFlushingCache()
......
......@@ -60,7 +60,6 @@ func newDmInputNode(dispatcherClient msgdispatcher.Client, seekPos *msgpb.MsgPos
}
log.Info("datanode consume successfully when register to msgDispatcher")
}
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
name := fmt.Sprintf("dmInputNode-data-%d-%s", dmNodeConfig.collectionID, dmNodeConfig.vChannelName)
node := flowgraph.NewInputNode(input, name, dmNodeConfig.maxQueueLength, dmNodeConfig.maxParallelism,
......
......@@ -56,16 +56,6 @@ var (
msgTypeLabelName,
})
DataNodeNumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "consumer_num",
Help: "number of consumers",
}, []string{
nodeIDLabelName,
})
DataNodeNumProducers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
......@@ -215,7 +205,6 @@ func RegisterDataNode(registry *prometheus.Registry) {
registry.MustRegister(DataNodeNumFlowGraphs)
registry.MustRegister(DataNodeConsumeMsgRowsCount)
registry.MustRegister(DataNodeFlushedSize)
registry.MustRegister(DataNodeNumConsumers)
registry.MustRegister(DataNodeNumProducers)
registry.MustRegister(DataNodeConsumeTimeTickLag)
registry.MustRegister(DataNodeEncodeBufferLatency)
......
......@@ -33,5 +33,6 @@ func TestRegisterMetrics(t *testing.T) {
RegisterQueryNode(r)
RegisterQueryCoord(r)
RegisterEtcdMetrics(r)
RegisterMq(r)
Register(r)
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
)
var (
NumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: "msg_queue",
Name: "consumer_num",
Help: "number of consumers",
}, []string{
roleNameLabelName,
nodeIDLabelName,
})
)
func RegisterMq(registry *prometheus.Registry) {
registry.MustRegister(NumConsumers)
}
......@@ -102,16 +102,6 @@ var (
nodeIDLabelName,
})
QueryNodeNumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.QueryNodeRole,
Name: "consumer_num",
Help: "number of consumers",
}, []string{
nodeIDLabelName,
})
QueryNodeSQCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
......@@ -363,7 +353,6 @@ func RegisterQueryNode(registry *prometheus.Registry) {
registry.MustRegister(QueryNodeNumSegments)
registry.MustRegister(QueryNodeNumDmlChannels)
registry.MustRegister(QueryNodeNumDeltaChannels)
registry.MustRegister(QueryNodeNumConsumers)
registry.MustRegister(QueryNodeSQCount)
registry.MustRegister(QueryNodeSQReqLatency)
registry.MustRegister(QueryNodeSQLatencyInQueue)
......
......@@ -27,9 +27,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
......@@ -117,6 +119,8 @@ func NewDispatcher(factory msgstream.Factory,
targets: make(map[string]*target),
stream: stream,
}
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc()
return d, nil
}
......@@ -178,6 +182,7 @@ func (d *Dispatcher) Handle(signal signal) {
d.cancel()
d.wg.Wait()
d.once.Do(func() {
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Dec()
d.stream.Close()
})
}
......
......@@ -224,9 +224,6 @@ func (q *queryNodeFlowGraph) newDmInputNode(collectionID UniqueID, vchannel Chan
func (q *queryNodeFlowGraph) close() {
q.dispClient.Deregister(q.vchannel)
q.flowGraph.Close()
if q.consumerCnt > 0 {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Sub(float64(q.consumerCnt))
}
log.Info("stop query node flow graph",
zap.Int64("collectionID", q.collectionID),
zap.String("vchannel", q.vchannel),
......
......@@ -718,7 +718,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
}
defer func() {
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec()
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Dec()
stream.Close()
}()
......@@ -748,7 +748,7 @@ func (loader *segmentLoader) FromDmlCPLoadDelete(ctx context.Context, collection
return nil
}
metrics.QueryNodeNumConsumers.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Inc()
metrics.NumConsumers.WithLabelValues(paramtable.GetRole(), fmt.Sprint(paramtable.GetNodeID())).Inc()
err = stream.Seek([]*msgpb.MsgPosition{position})
if err != nil {
return err
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册