diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 814b5899678c3e03317ca1190c172c5c00e8ba64..ce2e1353e3f9769748c7dab3f300be0a5926a34f 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -523,6 +523,15 @@ var ( Name: "release_dql_message_stream_total", Help: "Counter of release dql message stream", }, []string{"status"}) + + // ProxyDmlChannelTimeTick used to count the time tick value of dml channels + ProxyDmlChannelTimeTick = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Namespace: milvusNamespace, + Subsystem: subSystemProxy, + Name: "dml_channels_time_tick", + Help: "Time tick of dml channels", + }, []string{"pchan"}) ) //RegisterProxy register Proxy metrics @@ -570,6 +579,8 @@ func RegisterProxy() { prometheus.MustRegister(ProxyGetDdChannelCounter) prometheus.MustRegister(ProxyReleaseDQLMessageStreamCounter) + + prometheus.MustRegister(ProxyDmlChannelTimeTick) } //RegisterQueryCoord register QueryCoord metrics diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index e2821f36fd212135fbac4af01115255e92f585cd..a6c32b86508f5d1ad72ba662a8492a2a36743c39 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -20,6 +20,8 @@ import ( "sync/atomic" "time" + "github.com/milvus-io/milvus/internal/metrics" + "go.uber.org/zap" "github.com/milvus-io/milvus/internal/allocator" @@ -300,8 +302,6 @@ func (node *Proxy) sendChannelsTimeTickLoop() { } } - log.Debug("send timestamp statistics of pchan", zap.Any("channels", channels), zap.Any("tss", tss)) - req := &internalpb.ChannelTimeTickMsg{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_TimeTick, // todo @@ -314,6 +314,12 @@ func (node *Proxy) sendChannelsTimeTickLoop() { DefaultTimestamp: maxTs, } + for idx, channel := range channels { + ts := tss[idx] + metrics.ProxyDmlChannelTimeTick.WithLabelValues(channel).Set(float64(ts)) + } + metrics.ProxyDmlChannelTimeTick.WithLabelValues("DefaultTimestamp").Set(float64(maxTs)) + status, err := node.rootCoord.UpdateChannelTimeTick(node.ctx, req) if err != nil { log.Warn("sendChannelsTimeTickLoop.UpdateChannelTimeTick", zap.Error(err))