未验证 提交 13177a90 编写于 作者: B bigsheeper 提交者: GitHub

Add prometheus metrics for DataNode (#15650)

Signed-off-by: Nbigsheeper <yihao.dai@zilliz.com>
Co-authored-by: NCai Yudong <yudong.cai@zilliz.com>
上级 d6857f4b
......@@ -20,8 +20,9 @@ import (
"context"
"sync"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
)
const (
......
......@@ -26,6 +26,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
......@@ -33,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -78,6 +80,7 @@ type compactionTask struct {
cancel context.CancelFunc
wg sync.WaitGroup
tr *timerecord.TimeRecorder
}
// check if compactionTask implements compactor
......@@ -105,6 +108,7 @@ func newCompactionTask(
allocatorInterface: alloc,
dc: dc,
plan: plan,
tr: timerecord.NewTimeRecorder("compactionTask"),
}
}
......@@ -538,6 +542,8 @@ func (t *compactionTask) compact() error {
)
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart))))
metrics.DataNodeCompactionLatency.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Observe(float64(t.tr.ElapseSpan().Milliseconds()))
return nil
}
......
......@@ -542,6 +542,7 @@ func (node *DataNode) ReadyToFlush() error {
// One precondition: The segmentID in req is in ascending order.
func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmentsRequest) (*commonpb.Status, error) {
metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsTotal).Inc()
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
......@@ -602,6 +603,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
status.ErrorCode = commonpb.ErrorCode_Success
metrics.DataNodeFlushSegmentsCounter.WithLabelValues(MetricRequestsSuccess).Inc()
return status, nil
}
......
......@@ -19,9 +19,11 @@ package datanode
import (
"context"
"errors"
"fmt"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
......@@ -128,6 +130,8 @@ func (dsService *dataSyncService) close() {
log.Debug("dataSyncService closing flowgraph", zap.Int64("collectionID", dsService.collectionID),
zap.String("vChanName", dsService.vchannelName))
dsService.fg.Close()
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(dsService.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(dsService.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Sub(2) // timeTickChannel + deltaChannel
}
dsService.cancelFn()
......@@ -215,6 +219,7 @@ func (dsService *dataSyncService) initNodes(vchanInfo *datapb.VchannelInfo) erro
var insertBufferNode Node
insertBufferNode, err = newInsertBufferNode(
dsService.ctx,
dsService.collectionID,
dsService.flushCh,
dsService.flushManager,
dsService.flushingSegCache,
......
......@@ -23,6 +23,7 @@ import (
"sync/atomic"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -292,6 +293,7 @@ func newDDNode(ctx context.Context, collID UniqueID, vchanInfo *datapb.VchannelI
deltaStream.SetRepackFunc(msgstream.DefaultRepackFunc)
deltaStream.AsProducer([]string{deltaChannelName})
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
log.Debug("datanode AsProducer", zap.String("DeltaChannelName", deltaChannelName))
var deltaMsgStream msgstream.MsgStream = deltaStream
deltaMsgStream.Start()
......
......@@ -18,6 +18,7 @@ package datanode
import (
"context"
"fmt"
"math"
"sync"
......@@ -25,6 +26,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
......@@ -167,6 +169,7 @@ func (dn *deleteNode) bufferDeleteMsg(msg *msgstream.DeleteMsg, tr TimeRange) er
// store
delDataBuf.updateSize(int64(rows))
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(metrics.DataNodeMsgTypeDelete, fmt.Sprint(Params.DataNodeCfg.NodeID)).Add(float64(rows))
delDataBuf.updateTimeRange(tr)
dn.delBuf.Store(segID, delDataBuf)
}
......
......@@ -22,6 +22,7 @@ import (
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/flowgraph"
......@@ -44,6 +45,7 @@ func newDmInputNode(ctx context.Context, seekPos *internalpb.MsgPosition, dmNode
// is virtual channel name, so we need to convert vchannel name into pchannel neme here.
pchannelName := rootcoord.ToPhysicalChannel(dmNodeConfig.vChannelName)
insertStream.AsConsumer([]string{pchannelName}, consumeSubName)
metrics.DataNodeNumConsumers.WithLabelValues(fmt.Sprint(dmNodeConfig.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
log.Debug("datanode AsConsumer", zap.String("physical channel", pchannelName), zap.String("subName", consumeSubName), zap.Int64("collection ID", dmNodeConfig.collectionID))
if seekPos != nil {
......
......@@ -21,6 +21,7 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"strconv"
"sync"
......@@ -30,17 +31,17 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/msgstream"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/tsoutil"
)
type (
......@@ -299,6 +300,8 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
flushed: false,
dropped: false,
})
metrics.DataNodeAutoFlushSegmentCount.WithLabelValues(ibNode.channelName, fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
}
}
......@@ -343,10 +346,13 @@ func (ibNode *insertBufferNode) Operate(in []Msg) []Msg {
err := ibNode.flushManager.flushBufferData(task.buffer, task.segmentID, task.flushed, task.dropped, endPositions[0])
if err != nil {
log.Warn("failed to invoke flushBufferData", zap.Error(err))
metrics.DataNodeFlushSegmentCount.WithLabelValues(metrics.DataNodeMetricLabelFail, fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
} else {
segmentsToFlush = append(segmentsToFlush, task.segmentID)
ibNode.insertBuffer.Delete(task.segmentID)
metrics.DataNodeFlushSegmentCount.WithLabelValues(metrics.DataNodeMetricLabelSuccess, fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
}
metrics.DataNodeFlushSegmentCount.WithLabelValues(metrics.DataNodeMetricLabelTotal, fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
}
if err := ibNode.writeHardTimeTick(fgMsg.timeRange.timestampMax, seg2Upload); err != nil {
......@@ -673,6 +679,7 @@ func (ibNode *insertBufferNode) bufferInsertMsg(msg *msgstream.InsertMsg, endPos
// update buffer size
buffer.updateSize(int64(len(msg.RowData)))
metrics.DataNodeConsumeMsgRowsCount.WithLabelValues(metrics.DataNodeMsgTypeInsert, fmt.Sprint(Params.DataNodeCfg.NodeID)).Add(float64(len(msg.RowData)))
// store in buffer
ibNode.insertBuffer.Store(currentSegID, buffer)
......@@ -703,7 +710,7 @@ func (ibNode *insertBufferNode) getCollectionandPartitionIDbySegID(segmentID Uni
return ibNode.replica.getCollectionAndPartitionID(segmentID)
}
func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushManager,
func newInsertBufferNode(ctx context.Context, collID UniqueID, flushCh <-chan flushMsg, fm flushManager,
flushingSegCache *Cache, config *nodeConfig) (*insertBufferNode, error) {
baseNode := BaseNode{}
......@@ -716,6 +723,7 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
return nil, err
}
wTt.AsProducer([]string{Params.MsgChannelCfg.DataCoordTimeTick})
metrics.DataNodeNumProducers.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
log.Debug("datanode AsProducer", zap.String("TimeTickChannelName", Params.MsgChannelCfg.DataCoordTimeTick))
var wTtMsgStream msgstream.MsgStream = wTt
wTtMsgStream.Start()
......@@ -729,6 +737,7 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
continue
}
stats = append(stats, stat)
metrics.DataNodeSegmentRowsCount.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Add(float64(stat.NumRows))
}
msgPack := msgstream.MsgPack{}
timeTickMsg := msgstream.DataNodeTtMsg{
......@@ -749,6 +758,8 @@ func newInsertBufferNode(ctx context.Context, flushCh <-chan flushMsg, fm flushM
},
}
msgPack.Msgs = append(msgPack.Msgs, &timeTickMsg)
pt, _ := tsoutil.ParseHybridTs(ts)
metrics.DataNodeTimeSync.WithLabelValues(config.vChannelName, fmt.Sprint(Params.DataNodeCfg.NodeID)).Set(float64(pt))
return wTtMsgStream.Produce(&msgPack)
})
......
......@@ -94,7 +94,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, fm, newCache(), c)
assert.NotNil(t, iBNode)
require.NoError(t, err)
......@@ -108,7 +108,7 @@ func TestFlowGraphInsertBufferNodeCreate(t *testing.T) {
cd: 0,
}
_, err = newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
_, err = newInsertBufferNode(ctx, collMeta.ID, flushChan, fm, newCache(), c)
assert.Error(t, err)
}
......@@ -183,7 +183,7 @@ func TestFlowGraphInsertBufferNode_Operate(t *testing.T) {
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, fm, newCache(), c)
require.NoError(t, err)
// trigger log ts
......@@ -401,7 +401,7 @@ func TestFlowGraphInsertBufferNode_AutoFlush(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, fm, newCache(), c)
require.NoError(t, err)
// Auto flush number of rows set to 2
......@@ -662,7 +662,7 @@ func TestInsertBufferNode_bufferInsertMsg(t *testing.T) {
allocator: NewAllocatorFactory(),
vChannelName: "string",
}
iBNode, err := newInsertBufferNode(ctx, flushChan, fm, newCache(), c)
iBNode, err := newInsertBufferNode(ctx, collMeta.ID, flushChan, fm, newCache(), c)
require.NoError(t, err)
inMsg := genFlowGraphInsertMsg(insertChannelName)
......
......@@ -21,6 +21,7 @@ import (
"sync"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
......@@ -65,6 +66,10 @@ func (fm *flowgraphManager) addAndStart(dn *DataNode, vchan *datapb.VchannelInfo
log.Info("successfully started dataSyncService", zap.String("vChannelName", vchan.GetChannelName()))
fm.flowgraphs.Store(vchan.GetChannelName(), dataSyncService)
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
metrics.DataNodeNumDmlChannels.WithLabelValues(fmt.Sprint(vchan.GetCollectionID()), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
metrics.DataNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(vchan.GetCollectionID()), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
return nil
}
......@@ -72,7 +77,11 @@ func (fm *flowgraphManager) release(vchanName string) {
log.Debug("release flowgraph resources begin", zap.String("vChannelName", vchanName))
if fg, loaded := fm.flowgraphs.LoadAndDelete(vchanName); loaded {
collectionID := fg.(*dataSyncService).collectionID
fg.(*dataSyncService).close()
metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
metrics.DataNodeNumDmlChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
metrics.DataNodeNumDeltaChannels.WithLabelValues(fmt.Sprint(collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
}
log.Debug("release flowgraph resources end", zap.String("Vchannel", vchanName))
}
......
......@@ -25,12 +25,14 @@ import (
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord"
"go.uber.org/atomic"
"go.uber.org/zap"
)
......@@ -324,6 +326,8 @@ func (m *rendezvousFlushManager) handleDeleteTask(segmentID UniqueID, task flush
func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID UniqueID, flushed bool,
dropped bool, pos *internalpb.MsgPosition) error {
tr := timerecord.NewTimeRecorder("flushDuration")
// empty flush
if data == nil || data.buffer == nil {
//m.getFlushQueue(segmentID).enqueueInsertFlush(&flushBufferInsertTask{},
......@@ -408,6 +412,8 @@ func (m *rendezvousFlushManager) flushBufferData(data *BufferData, segmentID Uni
BaseKV: m.BaseKV,
data: kvs,
}, field2Insert, field2Stats, flushed, dropped, pos)
metrics.DataNodeFlushSegmentLatency.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
return nil
}
......@@ -548,7 +554,13 @@ type flushBufferInsertTask struct {
// flushInsertData implements flushInsertTask
func (t *flushBufferInsertTask) flushInsertData() error {
if t.BaseKV != nil && len(t.data) > 0 {
return t.MultiSave(t.data)
for _, d := range t.data {
metrics.DataNodeFlushedSize.WithLabelValues(metrics.DataNodeMsgTypeInsert, fmt.Sprint(Params.DataNodeCfg.NodeID)).Add(float64(len(d)))
}
tr := timerecord.NewTimeRecorder("insertData")
err := t.MultiSave(t.data)
metrics.DataNodeSave2StorageLatency.WithLabelValues(metrics.DataNodeMsgTypeInsert, fmt.Sprint(Params.DataNodeCfg.NodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
return err
}
return nil
}
......@@ -561,7 +573,13 @@ type flushBufferDeleteTask struct {
// flushDeleteData implements flushDeleteTask
func (t *flushBufferDeleteTask) flushDeleteData() error {
if len(t.data) > 0 && t.BaseKV != nil {
return t.MultiSave(t.data)
for _, d := range t.data {
metrics.DataNodeFlushedSize.WithLabelValues(metrics.DataNodeMsgTypeDelete, fmt.Sprint(Params.DataNodeCfg.NodeID)).Add(float64(len(d)))
}
tr := timerecord.NewTimeRecorder("deleteData")
err := t.MultiSave(t.data)
metrics.DataNodeSave2StorageLatency.WithLabelValues(metrics.DataNodeMsgTypeDelete, fmt.Sprint(Params.DataNodeCfg.NodeID)).Observe(float64(tr.ElapseSpan().Milliseconds()))
return err
}
return nil
}
......
......@@ -30,12 +30,12 @@ import (
"github.com/milvus-io/milvus/internal/kv"
miniokv "github.com/milvus-io/milvus/internal/kv/minio"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/schemapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
)
const (
......@@ -194,6 +194,7 @@ func (replica *SegmentReplica) new2FlushedSegment(segID UniqueID) {
replica.flushedSegments[segID] = &seg
delete(replica.newSegments, segID)
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(seg.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
}
// normal2FlushedSegment transfers a segment from *normal* to *flushed* by changing *isFlushed*
......@@ -205,6 +206,7 @@ func (replica *SegmentReplica) normal2FlushedSegment(segID UniqueID) {
replica.flushedSegments[segID] = &seg
delete(replica.normalSegments, segID)
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(seg.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
}
func (replica *SegmentReplica) getCollectionAndPartitionID(segID UniqueID) (collID, partitionID UniqueID, err error) {
......@@ -266,6 +268,7 @@ func (replica *SegmentReplica) addNewSegment(segID, collID, partitionID UniqueID
replica.segMu.Lock()
defer replica.segMu.Unlock()
replica.newSegments[segID] = seg
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
return nil
}
......@@ -360,6 +363,7 @@ func (replica *SegmentReplica) addNormalSegment(segID, collID, partitionID Uniqu
replica.segMu.Lock()
replica.normalSegments[segID] = seg
replica.segMu.Unlock()
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(collID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Inc()
return nil
}
......@@ -557,6 +561,15 @@ func (replica *SegmentReplica) removeSegments(segIDs ...UniqueID) {
log.Debug("remove segments if exist", zap.Int64s("segmentIDs", segIDs))
for _, segID := range segIDs {
if seg, ok := replica.newSegments[segID]; ok {
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(seg.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
}
if seg, ok := replica.normalSegments[segID]; ok {
metrics.DataNodeNumUnflushedSegments.WithLabelValues(fmt.Sprint(seg.collectionID), fmt.Sprint(Params.DataNodeCfg.NodeID)).Dec()
}
}
for _, segID := range segIDs {
delete(replica.newSegments, segID)
delete(replica.normalSegments, segID)
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
// TODO: use the common status label
DataNodeMetricLabelSuccess = "success"
DataNodeMetricLabelFail = "fail"
DataNodeMetricLabelTotal = "total"
DataNodeMsgTypeInsert = "insert"
DataNodeMsgTypeDelete = "delete"
)
// TODO: move to metrics.go
const (
nodeIDLabelName = "node_id"
statusLabelName = "status"
msgTypeLabelName = "msg_type"
collectionIDLabelName = "collection_id"
channelNameLabelName = "channel_name"
)
// dataNodeDurationBuckets involves durations in milliseconds,
// [10 20 40 80 160 320 640 1280 2560 5120 10240 20480 40960 81920 163840 327680 655360 1.31072e+06]
var dataNodeDurationBuckets = prometheus.ExponentialBuckets(10, 2, 18)
var (
DataNodeNumFlowGraphs = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_flow_graphs",
Help: "Number of flow graphs in DataNode.",
}, []string{
nodeIDLabelName,
})
DataNodeConsumeMsgRowsCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "message_rows_count",
Help: "Messages rows size count consumed from msgStream in DataNode.",
}, []string{
msgTypeLabelName,
nodeIDLabelName,
})
DataNodeFlushedSize = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flushed_size",
Help: "Data size flushed to storage in DataNode.",
}, []string{
msgTypeLabelName,
nodeIDLabelName,
})
DataNodeNumDmlChannels = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_dml_channels",
Help: "Number of dmlChannels per collection in DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeNumDeltaChannels = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_delta_channels",
Help: "Number of deltaChannels per collection in DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeNumConsumers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_consumers",
Help: "Number of consumers per collection in DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeNumProducers = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_producers",
Help: "Number of producers per collection in DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeTimeSync = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "time_sync",
Help: "Synchronized timestamps per channel in DataNode.",
}, []string{
channelNameLabelName,
nodeIDLabelName,
})
DataNodeSegmentRowsCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "seg_rows_count",
Help: "Rows count of segments which sent to DataCoord from DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeNumUnflushedSegments = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "num_unflushed_segments",
Help: "Number of unflushed segments in DataNode.",
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeFlushSegmentLatency = prometheus.NewHistogramVec( // TODO: arguably
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flush_segment_latency",
Help: "The flush segment latency in DataNode.",
Buckets: dataNodeDurationBuckets,
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
DataNodeSave2StorageLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "save_latency",
Help: "The latency saving flush data to storage in DataNode.",
Buckets: []float64{0, 10, 100, 200, 400, 1000, 10000},
}, []string{
msgTypeLabelName,
nodeIDLabelName,
})
DataNodeFlushSegmentCount = prometheus.NewCounterVec( // TODO: arguably
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "flush_segment_count",
Help: "Flush segment statistics in DataNode.",
}, []string{
statusLabelName,
nodeIDLabelName,
})
DataNodeAutoFlushSegmentCount = prometheus.NewCounterVec( // TODO: arguably
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "auto_flush_segment_count",
Help: "Auto flush segment statistics in DataNode.",
}, []string{
channelNameLabelName,
nodeIDLabelName,
})
DataNodeCompactionLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataNodeRole,
Name: "compaction_latency",
Help: "Compaction latency in DataNode.",
Buckets: dataNodeDurationBuckets,
}, []string{
collectionIDLabelName,
nodeIDLabelName,
})
)
//RegisterDataNode registers DataNode metrics
func RegisterDataNode() {
prometheus.MustRegister(DataNodeNumFlowGraphs)
prometheus.MustRegister(DataNodeConsumeMsgRowsCount)
prometheus.MustRegister(DataNodeFlushedSize)
prometheus.MustRegister(DataNodeNumDmlChannels)
prometheus.MustRegister(DataNodeNumDeltaChannels)
prometheus.MustRegister(DataNodeNumConsumers)
prometheus.MustRegister(DataNodeNumProducers)
prometheus.MustRegister(DataNodeTimeSync)
prometheus.MustRegister(DataNodeSegmentRowsCount)
prometheus.MustRegister(DataNodeNumUnflushedSegments)
prometheus.MustRegister(DataNodeFlushSegmentLatency)
prometheus.MustRegister(DataNodeSave2StorageLatency)
prometheus.MustRegister(DataNodeFlushSegmentCount)
prometheus.MustRegister(DataNodeAutoFlushSegmentCount)
prometheus.MustRegister(DataNodeCompactionLatency)
}
......@@ -628,12 +628,6 @@ var (
}, []string{"type"})
)
//RegisterDataNode registers DataNode metrics
func RegisterDataNode() {
prometheus.MustRegister(DataNodeFlushSegmentsCounter)
prometheus.MustRegister(DataNodeWatchDmChannelsCounter)
}
//RegisterIndexNode registers IndexNode metrics
func RegisterIndexNode() {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册