diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 02922e95a9000dc5ec0cf4415b1213f250fd6301..d3599d700c11c3123d6ef20067f3430a68257e5b 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -62,6 +62,7 @@ const ( channelNameLabelName = "channel_name" functionLabelName = "function_name" queryTypeLabelName = "query_type" + collectionName = "collection_name" segmentStateLabelName = "segment_state" usernameLabelName = "username" rolenameLabelName = "role_name" diff --git a/internal/metrics/proxy_metrics.go b/internal/metrics/proxy_metrics.go index 7b6ca846b586dcc61999a29bf8b4cd7b890352d5..89a462e988f524c78480c673d6d0f16bfb217ec9 100644 --- a/internal/metrics/proxy_metrics.go +++ b/internal/metrics/proxy_metrics.go @@ -47,16 +47,45 @@ var ( Help: "counter of vectors successfully inserted", }, []string{nodeIDLabelName}) - // ProxySearchLatency record the latency of search successfully. - ProxySearchLatency = prometheus.NewHistogramVec( + // ProxySQLatency record the latency of search successfully. + ProxySQLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: typeutil.ProxyRole, Name: "sq_latency", - Help: "latency of search", + Help: "latency of search or query successfully", Buckets: buckets, }, []string{nodeIDLabelName, queryTypeLabelName}) + // ProxyCollectionSQLatency record the latency of search successfully, per collection + ProxyCollectionSQLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "collection_sq_latency", + Help: "latency of search or query successfully, per collection", + Buckets: buckets, + }, []string{nodeIDLabelName, queryTypeLabelName, collectionName}) + + // ProxyMutationLatency record the latency that mutate successfully. + ProxyMutationLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "mutation_latency", + Help: "latency of insert or delete successfully", + Buckets: buckets, // unit: ms + }, []string{nodeIDLabelName, msgTypeLabelName}) + + // ProxyMutationLatency record the latency that mutate successfully, per collection + ProxyCollectionMutationLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.ProxyRole, + Name: "collection_mutation_latency", + Help: "latency of insert or delete successfully, per collection", + Buckets: buckets, + }, []string{nodeIDLabelName, msgTypeLabelName, collectionName}) // ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result. ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -66,7 +95,6 @@ var ( Help: "latency that proxy waits for the result", Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, queryTypeLabelName}) - // ProxyReduceResultLatency record the time that the proxy reduces search result. ProxyReduceResultLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -96,16 +124,6 @@ var ( Help: "number of MsgStream objects per physical channel", }, []string{nodeIDLabelName, channelNameLabelName}) - // ProxyMutationLatency record the latency that insert successfully. - ProxyMutationLatency = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "mutation_latency", - Help: "latency of insert or delete successfully", - Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, msgTypeLabelName}) - // ProxySendMutationReqLatency record the latency that Proxy send insert request to MsgStream. ProxySendMutationReqLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ @@ -117,12 +135,12 @@ var ( }, []string{nodeIDLabelName, msgTypeLabelName}) // ProxyCacheHitCounter record the number of Proxy cache hits or miss. - ProxyCacheHitCounter = prometheus.NewCounterVec( + ProxyCacheStatsCounter = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, Subsystem: typeutil.ProxyRole, Name: "cache_hit_count", - Help: "count of cache hits", + Help: "count of cache hits/miss", }, []string{nodeIDLabelName, cacheNameLabelName, cacheStateLabelName}) // ProxyUpdateCacheLatency record the time that proxy update cache when cache miss. @@ -164,60 +182,22 @@ var ( Buckets: buckets, // unit: ms }, []string{nodeIDLabelName}) - // ProxyDDLFunctionCall records the number of times the function of the DDL operation was executed, like `CreateCollection`. - ProxyDDLFunctionCall = prometheus.NewCounterVec( + // ProxyFunctionCall records the number of times the function of the DDL operation was executed, like `CreateCollection`. + ProxyFunctionCall = prometheus.NewCounterVec( prometheus.CounterOpts{ Namespace: milvusNamespace, Subsystem: typeutil.ProxyRole, - Name: "ddl_req_count", - Help: "count of DDL operation executed", + Name: "req_count", + Help: "count of operation executed", }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) - // ProxyDQLFunctionCall records the number of times the function of the DQL operation was executed, like `HasCollection`. - ProxyDQLFunctionCall = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "dql_req_count", - Help: "count of DQL operation executed", - }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) - - // ProxyDMLFunctionCall records the number of times the function of the DML operation was executed, like `LoadCollection`. - ProxyDMLFunctionCall = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "dml_req_count", - Help: "count of DML operation executed", - }, []string{nodeIDLabelName, functionLabelName, statusLabelName}) - - // ProxyDDLReqLatency records the latency that for DML request, like "CreateCollection". - ProxyDDLReqLatency = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "ddl_req_latency", - Help: "latency of each DDL request", - Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, functionLabelName}) - - // ProxyDMLReqLatency records the latency that for DML request. - ProxyDMLReqLatency = prometheus.NewHistogramVec( - prometheus.HistogramOpts{ - Namespace: milvusNamespace, - Subsystem: typeutil.ProxyRole, - Name: "dml_req_latency", - Help: "latency of each DML request excluding insert and delete", - Buckets: buckets, // unit: ms - }, []string{nodeIDLabelName, functionLabelName}) - - // ProxyDQLReqLatency record the latency that for DQL request, like "HasCollection". - ProxyDQLReqLatency = prometheus.NewHistogramVec( + // ProxyReqLatency records the latency that for all requests, like "CreateCollection". + ProxyReqLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: milvusNamespace, Subsystem: typeutil.ProxyRole, - Name: "dql_req_latency", - Help: "latency of each DQL request excluding search and query", + Name: "req_latency", + Help: "latency of each request", Buckets: buckets, // unit: ms }, []string{nodeIDLabelName, functionLabelName}) @@ -254,29 +234,29 @@ func RegisterProxy(registry *prometheus.Registry) { registry.MustRegister(ProxySearchVectors) registry.MustRegister(ProxyInsertVectors) - registry.MustRegister(ProxySearchLatency) + registry.MustRegister(ProxySQLatency) + registry.MustRegister(ProxyCollectionSQLatency) + registry.MustRegister(ProxyMutationLatency) + registry.MustRegister(ProxyCollectionMutationLatency) + registry.MustRegister(ProxyWaitForSearchResultLatency) registry.MustRegister(ProxyReduceResultLatency) registry.MustRegister(ProxyDecodeResultLatency) registry.MustRegister(ProxyMsgStreamObjectsForPChan) - registry.MustRegister(ProxyMutationLatency) registry.MustRegister(ProxySendMutationReqLatency) - registry.MustRegister(ProxyCacheHitCounter) + registry.MustRegister(ProxyCacheStatsCounter) registry.MustRegister(ProxyUpdateCacheLatency) registry.MustRegister(ProxySyncTimeTick) registry.MustRegister(ProxyApplyPrimaryKeyLatency) registry.MustRegister(ProxyApplyTimestampLatency) - registry.MustRegister(ProxyDDLFunctionCall) - registry.MustRegister(ProxyDQLFunctionCall) - registry.MustRegister(ProxyDMLFunctionCall) - registry.MustRegister(ProxyDDLReqLatency) - registry.MustRegister(ProxyDMLReqLatency) - registry.MustRegister(ProxyDQLReqLatency) + registry.MustRegister(ProxyFunctionCall) + registry.MustRegister(ProxyReqLatency) + registry.MustRegister(ProxyReceiveBytes) registry.MustRegister(ProxyReadReqSendBytes) @@ -301,3 +281,14 @@ func SetRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, rate flo ProxyLimiterRate.WithLabelValues(nodeIDStr, QueryLabel).Set(rate) } } + +func CleanupCollectionMetrics(nodeID int64, collection string) { + ProxyCollectionSQLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: SearchLabel, collectionName: collection}) + ProxyCollectionSQLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10), + queryTypeLabelName: QueryLabel, collectionName: collection}) + ProxyCollectionMutationLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10), + msgTypeLabelName: InsertLabel, collectionName: collection}) + ProxyCollectionMutationLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10), + msgTypeLabelName: DeleteLabel, collectionName: collection}) +} diff --git a/internal/proxy/channels_mgr.go b/internal/proxy/channels_mgr.go index ebcadd18093c2d0ebe1d50d7bcd6175d1b26c49e..c46a84516d13b5c203fcaccffa7bd9da2d2c34c5 100644 --- a/internal/proxy/channels_mgr.go +++ b/internal/proxy/channels_mgr.go @@ -39,8 +39,8 @@ type channelsMgr interface { getChannels(collectionID UniqueID) ([]pChan, error) getVChannels(collectionID UniqueID) ([]vChan, error) getOrCreateDmlStream(collectionID UniqueID) (msgstream.MsgStream, error) - removeDMLStream(collectionID UniqueID) error - removeAllDMLStream() error + removeDMLStream(collectionID UniqueID) + removeAllDMLStream() } type channelInfos struct { @@ -279,7 +279,7 @@ func (mgr *singleTypeChannelsMgr) getOrCreateStream(collectionID UniqueID) (msgs // removeStream remove the corresponding stream of the specified collection. Idempotent. // If stream already exists, remove it, otherwise do nothing. -func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) error { +func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) { mgr.mu.Lock() defer mgr.mu.Unlock() if info, ok := mgr.infos[collectionID]; ok { @@ -288,11 +288,10 @@ func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) error { delete(mgr.infos, collectionID) } log.Info("dml stream removed", zap.Int64("collection_id", collectionID)) - return nil } // removeAllStream remove all message stream. -func (mgr *singleTypeChannelsMgr) removeAllStream() error { +func (mgr *singleTypeChannelsMgr) removeAllStream() { mgr.mu.Lock() defer mgr.mu.Unlock() for _, info := range mgr.infos { @@ -301,7 +300,6 @@ func (mgr *singleTypeChannelsMgr) removeAllStream() error { } mgr.infos = make(map[UniqueID]streamInfos) log.Info("all dml stream removed") - return nil } func newSingleTypeChannelsMgr( @@ -339,12 +337,12 @@ func (mgr *channelsMgrImpl) getOrCreateDmlStream(collectionID UniqueID) (msgstre return mgr.dmlChannelsMgr.getOrCreateStream(collectionID) } -func (mgr *channelsMgrImpl) removeDMLStream(collectionID UniqueID) error { - return mgr.dmlChannelsMgr.removeStream(collectionID) +func (mgr *channelsMgrImpl) removeDMLStream(collectionID UniqueID) { + mgr.dmlChannelsMgr.removeStream(collectionID) } -func (mgr *channelsMgrImpl) removeAllDMLStream() error { - return mgr.dmlChannelsMgr.removeAllStream() +func (mgr *channelsMgrImpl) removeAllDMLStream() { + mgr.dmlChannelsMgr.removeAllStream() } // newChannelsMgrImpl constructs a channels manager. diff --git a/internal/proxy/channels_mgr_test.go b/internal/proxy/channels_mgr_test.go index 7a4c625b41805d59fedca278cc63a74c14fd6e78..46f17bf11570959b6f8bbafa6c3ca71798b663c9 100644 --- a/internal/proxy/channels_mgr_test.go +++ b/internal/proxy/channels_mgr_test.go @@ -370,9 +370,8 @@ func Test_singleTypeChannelsMgr_removeStream(t *testing.T) { }, }, } - err := m.removeStream(100) - assert.NoError(t, err) - _, err = m.lockGetStream(100) + m.removeStream(100) + _, err := m.lockGetStream(100) assert.Error(t, err) } @@ -384,8 +383,7 @@ func Test_singleTypeChannelsMgr_removeAllStream(t *testing.T) { }, }, } - err := m.removeAllStream() - assert.NoError(t, err) - _, err = m.lockGetStream(100) + m.removeAllStream() + _, err := m.lockGetStream(100) assert.Error(t, err) } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 5b6e3cc7371234a030c6eabe622e2ccec46ad686..d7294d6bed555ab17241073f58bf66b5e22a99f9 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -118,7 +118,9 @@ func (node *Proxy) InvalidateCollectionMetaCache(ctx context.Context, request *p } if request.GetBase().GetMsgType() == commonpb.MsgType_DropCollection { // no need to handle error, since this Proxy may not create dml stream for the collection. - _ = node.chMgr.removeDMLStream(request.GetCollectionID()) + node.chMgr.removeDMLStream(request.GetCollectionID()) + // clean up collection level metrics + metrics.CleanupCollectionMetrics(Params.ProxyCfg.GetNodeID(), collectionName) } logutil.Logger(ctx).Info("complete to invalidate collection meta cache", zap.String("role", typeutil.ProxyRole), @@ -145,7 +147,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat method := "CreateCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() cct := &createCollectionTask{ ctx: ctx, @@ -179,7 +181,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.Int32("shards_num", request.ShardsNum), zap.String("consistency_level", request.ConsistencyLevel.String())) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -215,7 +217,7 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.Int32("shards_num", request.ShardsNum), zap.String("consistency_level", request.ConsistencyLevel.String())) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -235,8 +237,8 @@ func (node *Proxy) CreateCollection(ctx context.Context, request *milvuspb.Creat zap.Int32("shards_num", request.ShardsNum), zap.String("consistency_level", request.ConsistencyLevel.String())) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cct.result, nil } @@ -251,7 +253,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol traceID, _, _ := trace.InfoFromSpan(sp) method := "DropCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() dct := &dropCollectionTask{ ctx: ctx, @@ -276,7 +278,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -303,7 +305,7 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -319,8 +321,8 @@ func (node *Proxy) DropCollection(ctx context.Context, request *milvuspb.DropCol zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dct.result, nil } @@ -337,7 +339,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle traceID, _, _ := trace.InfoFromSpan(sp) method := "HasCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug("HasCollection received", @@ -361,7 +363,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.BoolResponse{ Status: &commonpb.Status{ @@ -391,7 +393,7 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ Status: &commonpb.Status{ @@ -410,9 +412,9 @@ func (node *Proxy) HasCollection(ctx context.Context, request *milvuspb.HasColle zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return hct.result, nil } @@ -427,7 +429,8 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol traceID, _, _ := trace.InfoFromSpan(sp) method := "LoadCollection" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() lct := &loadCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -450,7 +453,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -477,10 +480,7 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.Uint64("EndTS", lct.EndTs()), zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -497,11 +497,9 @@ func (node *Proxy) LoadCollection(ctx context.Context, request *milvuspb.LoadCol zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lct.result, nil } @@ -516,7 +514,8 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele traceID, _, _ := trace.InfoFromSpan(sp) method := "ReleaseCollection" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() rct := &releaseCollectionTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -541,7 +540,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -571,9 +570,7 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -591,11 +588,9 @@ func (node *Proxy) ReleaseCollection(ctx context.Context, request *milvuspb.Rele zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rct.result, nil } @@ -612,6 +607,8 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des traceID, _, _ := trace.InfoFromSpan(sp) method := "DescribeCollection" tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() dct := &describeCollectionTask{ ctx: ctx, @@ -634,7 +631,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.DescribeCollectionResponse{ Status: &commonpb.Status{ @@ -664,9 +661,7 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.DescribeCollectionResponse{ @@ -686,11 +681,9 @@ func (node *Proxy) DescribeCollection(ctx context.Context, request *milvuspb.Des zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dct.result, nil } @@ -708,7 +701,8 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati traceID, _, _ := trace.InfoFromSpan(sp) method := "GetStatistics" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() g := &getStatisticsTask{ request: request, Condition: NewTaskCondition(ctx), @@ -737,7 +731,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.String("collection", request.CollectionName), zap.Strings("partitions", request.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.GetStatisticsResponse{ @@ -772,9 +766,7 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.String("collection", request.CollectionName), zap.Strings("partitions", request.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetStatisticsResponse{ @@ -795,11 +787,9 @@ func (node *Proxy) GetStatistics(ctx context.Context, request *milvuspb.GetStati zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -816,7 +806,8 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp traceID, _, _ := trace.InfoFromSpan(sp) method := "GetCollectionStatistics" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() g := &getCollectionStatisticsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -840,7 +831,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.GetCollectionStatisticsResponse{ @@ -873,9 +864,7 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetCollectionStatisticsResponse{ @@ -896,11 +885,9 @@ func (node *Proxy) GetCollectionStatistics(ctx context.Context, request *milvusp zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -913,7 +900,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo } method := "ShowCollections" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() sct := &showCollectionsTask{ ctx: ctx, @@ -942,7 +929,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Any("CollectionNames", request.CollectionNames), ) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.ShowCollectionsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -972,7 +959,7 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Any("CollectionNames", request.CollectionNames), ) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.ShowCollectionsResponse{ Status: &commonpb.Status{ @@ -991,8 +978,8 @@ func (node *Proxy) ShowCollections(ctx context.Context, request *milvuspb.ShowCo zap.Int("len(CollectionNames)", len(request.CollectionNames)), zap.Int("num_collections", len(sct.result.CollectionNames))) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return sct.result, nil } @@ -1007,7 +994,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC method := "AlterCollection" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() act := &alterCollectionTask{ ctx: ctx, @@ -1032,7 +1019,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -1062,7 +1049,7 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, Reason: err.Error(), @@ -1079,8 +1066,8 @@ func (node *Proxy) AlterCollection(ctx context.Context, request *milvuspb.AlterC zap.String("db", request.DbName), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return act.result, nil } @@ -1095,7 +1082,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create traceID, _, _ := trace.InfoFromSpan(sp) method := "CreatePartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() cpt := &createPartitionTask{ ctx: ctx, @@ -1123,7 +1110,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1155,7 +1142,7 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1174,8 +1161,8 @@ func (node *Proxy) CreatePartition(ctx context.Context, request *milvuspb.Create zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cpt.result, nil } @@ -1190,7 +1177,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart traceID, _, _ := trace.InfoFromSpan(sp) method := "DropPartition" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() dpt := &dropPartitionTask{ ctx: ctx, @@ -1218,7 +1205,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1250,7 +1237,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1269,8 +1256,8 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dpt.result, nil } @@ -1288,7 +1275,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit method := "HasPartition" tr := timerecord.NewTimeRecorder(method) //TODO: use collectionID instead of collectionName - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() hpt := &hasPartitionTask{ @@ -1317,7 +1304,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.BoolResponse{ @@ -1353,7 +1340,7 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.BoolResponse{ @@ -1376,9 +1363,9 @@ func (node *Proxy) HasPartition(ctx context.Context, request *milvuspb.HasPartit zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return hpt.result, nil } @@ -1393,7 +1380,8 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar traceID, _, _ := trace.InfoFromSpan(sp) method := "LoadPartitions" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() lpt := &loadPartitionsTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -1420,7 +1408,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ @@ -1453,9 +1441,7 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ @@ -1475,11 +1461,9 @@ func (node *Proxy) LoadPartitions(ctx context.Context, request *milvuspb.LoadPar zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return lpt.result, nil } @@ -1502,7 +1486,8 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele method := "ReleasePartitions" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), zap.String("traceID", traceID), @@ -1521,7 +1506,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ @@ -1554,9 +1539,7 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ @@ -1576,11 +1559,9 @@ func (node *Proxy) ReleasePartitions(ctx context.Context, request *milvuspb.Rele zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return rpt.result, nil } @@ -1597,6 +1578,8 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb traceID, _, _ := trace.InfoFromSpan(sp) method := "GetPartitionStatistics" tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() g := &getPartitionStatisticsTask{ ctx: ctx, @@ -1623,7 +1606,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.GetPartitionStatisticsResponse{ @@ -1658,9 +1641,7 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetPartitionStatisticsResponse{ @@ -1682,11 +1663,9 @@ func (node *Proxy) GetPartitionStatistics(ctx context.Context, request *milvuspb zap.String("collection", request.CollectionName), zap.String("partition", request.PartitionName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return g.result, nil } @@ -1714,7 +1693,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar method := "ShowPartitions" tr := timerecord.NewTimeRecorder(method) //TODO: use collectionID instead of collectionName - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug( @@ -1731,7 +1710,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.String("role", typeutil.ProxyRole), zap.Any("request", request)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.ShowPartitionsResponse{ @@ -1766,7 +1745,7 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.String("collection", spt.ShowPartitionsRequest.CollectionName), zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.ShowPartitionsResponse{ @@ -1788,9 +1767,9 @@ func (node *Proxy) ShowPartitions(ctx context.Context, request *milvuspb.ShowPar zap.String("collection", spt.ShowPartitionsRequest.CollectionName), zap.Any("partitions", spt.ShowPartitionsRequest.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return spt.result, nil } @@ -1857,7 +1836,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-ShowPartitions") defer sp.Finish() traceID, _, _ := trace.InfoFromSpan(sp) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() logger.Info( rpcReceived(method), zap.String("traceID", traceID), @@ -1866,6 +1845,7 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get getErrResponse := func(err error) *milvuspb.GetLoadingProgressResponse { logger.Error("fail to get loading progress", zap.String("collection_name", request.CollectionName), zap.Strings("partition_name", request.PartitionNames), zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetLoadingProgressResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1909,9 +1889,8 @@ func (node *Proxy) GetLoadingProgress(ctx context.Context, request *milvuspb.Get rpcDone(method), zap.String("traceID", traceID), zap.Any("request", request)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return &milvuspb.GetLoadingProgressResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -1940,7 +1919,8 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde method := "CreateIndex" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), zap.String("traceID", traceID), @@ -1961,7 +1941,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.String("field", request.GetFieldName()), zap.Any("extra_params", request.GetExtraParams())) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ @@ -1996,9 +1976,7 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.String("field", request.FieldName), zap.Any("extra_params", request.ExtraParams)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ @@ -2019,11 +1997,9 @@ func (node *Proxy) CreateIndex(ctx context.Context, request *milvuspb.CreateInde zap.String("field", request.FieldName), zap.Any("extra_params", request.ExtraParams)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cit.result, nil } @@ -2050,7 +2026,8 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe // avoid data race indexName := request.IndexName tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), zap.String("traceID", traceID), @@ -2071,7 +2048,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.String("field", request.FieldName), zap.String("index name", indexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.DescribeIndexResponse{ @@ -2112,9 +2089,7 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe if dit.result != nil { errCode = dit.result.Status.GetErrorCode() } - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.DescribeIndexResponse{ @@ -2137,11 +2112,9 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe zap.String("field", request.FieldName), zap.String("index name", indexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2164,6 +2137,8 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq method := "DropIndex" tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), @@ -2184,7 +2159,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.String("collection", request.CollectionName), zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ @@ -2219,9 +2194,7 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ @@ -2242,11 +2215,9 @@ func (node *Proxy) DropIndex(ctx context.Context, request *milvuspb.DropIndexReq zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDMLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dit.result, nil } @@ -2275,7 +2246,8 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. method := "GetIndexBuildProgress" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), zap.String("traceID", traceID), @@ -2295,7 +2267,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.String("collection", request.CollectionName), zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.GetIndexBuildProgressResponse{ @@ -2331,9 +2303,7 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.String("collection", request.CollectionName), zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetIndexBuildProgressResponse{ @@ -2357,11 +2327,9 @@ func (node *Proxy) GetIndexBuildProgress(ctx context.Context, request *milvuspb. zap.String("index name", request.IndexName), zap.Any("result", gibpt.result)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return gibpt.result, nil } @@ -2388,7 +2356,8 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex method := "GetIndexState" tr := timerecord.NewTimeRecorder(method) - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), zap.String("traceID", traceID), @@ -2409,7 +2378,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.GetIndexStateResponse{ @@ -2445,10 +2414,7 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.String("collection", request.CollectionName), zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.GetIndexStateResponse{ @@ -2471,11 +2437,9 @@ func (node *Proxy) GetIndexState(ctx context.Context, request *milvuspb.GetIndex zap.String("field", request.FieldName), zap.String("index name", request.IndexName)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dipt.result, nil } @@ -2498,11 +2462,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) rateCol.Add(internalpb.RateType_DMLInsert.String(), float64(receiveSize)) metrics.ProxyReceiveBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Add(float64(receiveSize)) - defer func() { - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - }() - + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() it := &insertTask{ ctx: ctx, Condition: NewTaskCondition(ctx), @@ -2563,7 +2523,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := node.sched.dmQueue.Enqueue(it); err != nil { log.Debug("Failed to enqueue insert task: " + err.Error()) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return constructFailedResponse(err), nil } @@ -2581,7 +2541,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) if err := it.WaitToFinish(); err != nil { log.Debug("Failed to execute insert task in task scheduler: "+err.Error(), zap.String("traceID", traceID)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return constructFailedResponse(err), nil } @@ -2602,11 +2562,12 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) // InsertCnt always equals to the number of entities in the request it.result.InsertCnt = int64(request.NumRows) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex)) metrics.ProxyInsertVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(successCnt)) metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) return it.result, nil } @@ -2631,7 +2592,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) method := "Delete" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() dt := &deleteTask{ ctx: ctx, @@ -2666,8 +2627,8 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) // MsgID will be set by Enqueue() if err := node.sched.dmQueue.Enqueue(dt); err != nil { log.Error("Failed to enqueue delete task: "+err.Error(), zap.String("traceID", traceID)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.AbandonLabel).Inc() return &milvuspb.MutationResult{ Status: &commonpb.Status{ @@ -2689,9 +2650,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) if err := dt.WaitToFinish(); err != nil { log.Error("Failed to execute delete task in task scheduler: "+err.Error(), zap.String("traceID", traceID)) - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.TotalLabel).Inc() - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.MutationResult{ Status: &commonpb.Status{ @@ -2701,9 +2660,10 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest) }, nil } - metrics.ProxyDMLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxyMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) return dt.result, nil } @@ -2721,7 +2681,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) } method := "Search" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "Proxy-Search") @@ -2774,7 +2734,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) zap.Uint64("travel_timestamp", travelTs), zap.Uint64("guarantee_timestamp", guaranteeTs)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &milvuspb.SearchResults{ @@ -2817,7 +2777,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) zap.Uint64("travel_timestamp", travelTs), zap.Uint64("guarantee_timestamp", guaranteeTs)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.SearchResults{ @@ -2831,6 +2791,7 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) span := tr.CtxRecord(ctx, "wait search result") metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(span.Milliseconds())) + tr.CtxRecord(ctx, "wait search result") log.Ctx(ctx).Debug( rpcDone(method), zap.String("role", typeutil.ProxyRole), @@ -2845,13 +2806,14 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest) zap.Uint64("travel_timestamp", travelTs), zap.Uint64("guarantee_timestamp", guaranteeTs)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxySearchVectors.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(qt.result.GetResults().GetNumQueries())) searchDur := tr.ElapseSpan().Milliseconds() - metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), + metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.SearchLabel).Observe(float64(searchDur)) - + metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), + metrics.SearchLabel, request.CollectionName).Observe(float64(searchDur)) if qt.result != nil { sentSize := proto.Size(qt.result) metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10)).Add(float64(sentSize)) @@ -2886,7 +2848,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* method := "Flush" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), @@ -2904,7 +2866,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.String("db", request.DbName), zap.Any("collections", request.CollectionNames)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() resp.Status.Reason = err.Error() return resp, nil @@ -2932,7 +2894,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.String("db", request.DbName), zap.Any("collections", request.CollectionNames)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status.Reason = err.Error() @@ -2949,8 +2911,8 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* zap.String("db", request.DbName), zap.Any("collections", request.CollectionNames)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return ft.result, nil } @@ -2989,7 +2951,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* method := "Query" - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Ctx(ctx).Info( @@ -3012,8 +2974,8 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, - metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.AbandonLabel).Inc() return &milvuspb.QueryResults{ Status: &commonpb.Status{ @@ -3042,7 +3004,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &milvuspb.QueryResults{ @@ -3055,6 +3017,7 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* span := tr.CtxRecord(ctx, "wait query result") metrics.ProxyWaitForSearchResultLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(span.Milliseconds())) + log.Ctx(ctx).Debug( rpcDone(method), zap.String("role", typeutil.ProxyRole), @@ -3063,11 +3026,13 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (* zap.String("collection", request.CollectionName), zap.Any("partitions", request.PartitionNames)) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxySearchLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), + metrics.ProxySQLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.QueryLabel).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyCollectionSQLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), + metrics.QueryLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds())) ret := &milvuspb.QueryResults{ Status: qt.result.Status, @@ -3098,7 +3063,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia method := "CreateAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), @@ -3118,7 +3083,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3149,7 +3114,7 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.String("db", request.DbName), zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3168,8 +3133,8 @@ func (node *Proxy) CreateAlias(ctx context.Context, request *milvuspb.CreateAlia zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return cat.result, nil } @@ -3192,7 +3157,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq method := "DropAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), @@ -3209,7 +3174,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.String("role", typeutil.ProxyRole), zap.String("db", request.DbName), zap.String("alias", request.Alias)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3239,7 +3204,7 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.String("db", request.DbName), zap.String("alias", request.Alias)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3257,8 +3222,8 @@ func (node *Proxy) DropAlias(ctx context.Context, request *milvuspb.DropAliasReq zap.String("db", request.DbName), zap.String("alias", request.Alias)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return dat.result, nil } @@ -3281,7 +3246,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR method := "AlterAlias" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() log.Debug( rpcReceived(method), @@ -3300,7 +3265,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.String("db", request.DbName), zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.AbandonLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3332,7 +3297,7 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -3351,8 +3316,8 @@ func (node *Proxy) AlterAlias(ctx context.Context, request *milvuspb.AlterAliasR zap.String("alias", request.Alias), zap.String("collection", request.CollectionName)) - metrics.ProxyDDLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDDLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return aat.result, nil } @@ -3468,13 +3433,30 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G } method := "GetPersistentSegmentInfo" tr := timerecord.NewTimeRecorder(method) - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.TotalLabel).Inc() - segments, err := node.getSegmentsOfCollection(ctx, req.DbName, req.CollectionName) + + // list segments + collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetCollectionName()) if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error() + return resp, nil + } + + getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{ + CollectionID: collectionID, + // -1 means list all partition segemnts + PartitionID: -1, + States: []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed}, + }) + if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error() return resp, nil } + + // get Segment info infoResp, err := node.dataCoord.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_SegmentInfo, @@ -3482,15 +3464,19 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G Timestamp: 0, SourceID: Params.ProxyCfg.GetNodeID(), }, - SegmentIDs: segments, + SegmentIDs: getSegmentsByStatesResponse.Segments, }) if err != nil { - log.Debug("GetPersistentSegmentInfo fail", zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.FailLabel).Inc() + log.Warn("GetPersistentSegmentInfo fail", zap.Error(err)) resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error() return resp, nil } log.Debug("GetPersistentSegmentInfo ", zap.Int("len(infos)", len(infoResp.Infos)), zap.Any("status", infoResp.Status)) if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.FailLabel).Inc() resp.Status.Reason = infoResp.Status.Reason return resp, nil } @@ -3504,9 +3490,9 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G State: info.State, } } - metrics.ProxyDQLFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() - metrics.ProxyDQLReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Infos = persistentInfos return resp, nil @@ -3529,8 +3515,14 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue return resp, nil } + method := "GetQuerySegmentInfo" + tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() + collID, err := globalMetaCache.GetCollectionID(ctx, req.CollectionName) if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() resp.Status.Reason = err.Error() return resp, nil } @@ -3544,13 +3536,14 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue CollectionID: collID, }) if err != nil { - log.Error("Failed to get segment info from QueryCoord", - zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + log.Error("Failed to get segment info from QueryCoord", zap.Error(err)) resp.Status.Reason = err.Error() return resp, nil } log.Debug("GetQuerySegmentInfo ", zap.Any("infos", infoResp.Infos), zap.Any("status", infoResp.Status)) if infoResp.Status.ErrorCode != commonpb.ErrorCode_Success { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("Failed to get segment info from QueryCoord", zap.String("errMsg", infoResp.Status.Reason)) resp.Status.Reason = infoResp.Status.Reason return resp, nil @@ -3569,65 +3562,14 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue NodeIds: info.NodeIds, } } + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Infos = queryInfos return resp, nil } -func (node *Proxy) getSegmentsOfCollection(ctx context.Context, dbName string, collectionName string) ([]UniqueID, error) { - describeCollectionResponse, err := node.rootCoord.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_DescribeCollection, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyCfg.GetNodeID(), - }, - DbName: dbName, - CollectionName: collectionName, - }) - if err != nil { - return nil, err - } - if describeCollectionResponse.Status.ErrorCode != commonpb.ErrorCode_Success { - return nil, errors.New(describeCollectionResponse.Status.Reason) - } - collectionID := describeCollectionResponse.CollectionID - showPartitionsResp, err := node.rootCoord.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_ShowPartitions, - MsgID: 0, - Timestamp: 0, - SourceID: Params.ProxyCfg.GetNodeID(), - }, - DbName: dbName, - CollectionName: collectionName, - CollectionID: collectionID, - }) - if err != nil { - return nil, err - } - if showPartitionsResp.Status.ErrorCode != commonpb.ErrorCode_Success { - return nil, errors.New(showPartitionsResp.Status.Reason) - } - - ret := make([]UniqueID, 0) - for _, partitionID := range showPartitionsResp.PartitionIDs { - getSegmentsByStatesResponse, err := node.dataCoord.GetSegmentsByStates(ctx, &datapb.GetSegmentsByStatesRequest{ - CollectionID: collectionID, - PartitionID: partitionID, - States: []commonpb.SegmentState{commonpb.SegmentState_Flushing, commonpb.SegmentState_Flushed, commonpb.SegmentState_Sealed}, - }) - if err != nil { - return nil, err - } - if getSegmentsByStatesResponse.Status.ErrorCode != commonpb.ErrorCode_Success { - return nil, errors.New(getSegmentsByStatesResponse.Status.Reason) - } - ret = append(ret, getSegmentsByStatesResponse.GetSegments()...) - } - return ret, nil -} - // Dummy handles dummy request func (node *Proxy) Dummy(ctx context.Context, req *milvuspb.DummyRequest) (*milvuspb.DummyResponse, error) { failedResponse := &milvuspb.DummyResponse{ @@ -3913,6 +3855,31 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq return status, nil } +// GetReplicas gets replica info +func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { + log.Info("received get replicas request") + resp := &milvuspb.GetReplicasResponse{} + if !node.checkHealthy() { + resp.Status = unhealthyStatus() + return resp, nil + } + + req.Base = &commonpb.MsgBase{ + MsgType: commonpb.MsgType_GetReplicas, + SourceID: Params.ProxyCfg.GetNodeID(), + } + + resp, err := node.queryCoord.GetReplicas(ctx, req) + if err != nil { + log.Error("Failed to get replicas from Query Coordinator", zap.Error(err)) + resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + resp.Status.Reason = err.Error() + return resp, nil + } + log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err)) + return resp, nil +} + //GetCompactionState gets the compaction state of multiple segments func (node *Proxy) GetCompactionState(ctx context.Context, req *milvuspb.GetCompactionStateRequest) (*milvuspb.GetCompactionStateResponse, error) { log.Info("received GetCompactionState request", zap.Int64("compactionID", req.GetCompactionID())) @@ -4009,14 +3976,24 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi resp.Status = unhealthyStatus() return resp, nil } + + method := "Import" + tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() + // Call rootCoord to finish import. respFromRC, err := node.rootCoord.Import(ctx, req) if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("failed to execute bulk load request", zap.Error(err)) resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status.Reason = err.Error() return resp, nil } + + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return respFromRC, nil } @@ -4028,10 +4005,24 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt resp.Status = unhealthyStatus() return resp, nil } + method := "GetImportState" + tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() resp, err := node.rootCoord.GetImportState(ctx, req) - log.Info("received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err)) - return resp, err + if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + log.Error("failed to execute get import state", zap.Error(err)) + resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + resp.Status.Reason = err.Error() + return resp, nil + } + + log.Info("successfully received get import state response", zap.Int64("taskID", req.GetTask()), zap.Any("resp", resp), zap.Error(err)) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) + return resp, nil } // ListImportTasks get id array of all import tasks from rootcoord @@ -4042,28 +4033,22 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport resp.Status = unhealthyStatus() return resp, nil } - + method := "ListImportTasks" + tr := timerecord.NewTimeRecorder(method) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, + metrics.TotalLabel).Inc() resp, err := node.rootCoord.ListImportTasks(ctx, req) - log.Info("received list import tasks response") - return resp, err -} - -// GetReplicas gets replica info -func (node *Proxy) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) { - log.Info("received get replicas request") - resp := &milvuspb.GetReplicasResponse{} - if !node.checkHealthy() { - resp.Status = unhealthyStatus() + if err != nil { + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.FailLabel).Inc() + log.Error("failed to execute list import tasks", zap.Error(err)) + resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + resp.Status.Reason = err.Error() return resp, nil } - req.Base = &commonpb.MsgBase{ - MsgType: commonpb.MsgType_GetReplicas, - SourceID: Params.ProxyCfg.GetNodeID(), - } - - resp, err := node.queryCoord.GetReplicas(ctx, req) - log.Info("received get replicas response", zap.Any("resp", resp), zap.Error(err)) + log.Info("successfully received list import tasks response", zap.String("collection", req.CollectionName), zap.Any("tasks", resp.Tasks)) + metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() + metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) return resp, err } diff --git a/internal/proxy/meta_cache.go b/internal/proxy/meta_cache.go index 9acf54b3bc3cb6e7e4feb6ef3518b8f36f2043e4..7adedfd44818eb17b4833c157b335e0ea13da0b8 100644 --- a/internal/proxy/meta_cache.go +++ b/internal/proxy/meta_cache.go @@ -163,7 +163,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) collInfo, ok := m.collInfo[collectionName] if !ok { - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() coll, err := m.describeCollection(ctx, collectionName) @@ -178,7 +178,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string) return collInfo.collID, nil } defer m.mu.RUnlock() - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc() return collInfo.collID, nil } @@ -193,7 +193,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string if !ok { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc() coll, err := m.describeCollection(ctx, collectionName) if err != nil { return nil, err @@ -239,7 +239,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string } } - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc() return collInfo, nil } @@ -248,7 +248,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri collInfo, ok := m.collInfo[collectionName] if !ok { - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc() tr := timerecord.NewTimeRecorder("UpdateCache") m.mu.RUnlock() coll, err := m.describeCollection(ctx, collectionName) @@ -269,7 +269,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri return collInfo.schema, nil } defer m.mu.RUnlock() - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc() return collInfo.schema, nil } @@ -309,7 +309,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc() m.mu.RUnlock() partitions, err := m.showPartitions(ctx, collectionName) @@ -335,7 +335,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m } defer m.mu.RUnlock() - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc() ret := make(map[string]typeutil.UniqueID) partInfo := m.collInfo[collectionName].partInfo @@ -366,7 +366,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, if !ok { tr := timerecord.NewTimeRecorder("UpdateCache") - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc() partitions, err := m.showPartitions(ctx, collectionName) if err != nil { return nil, err @@ -385,7 +385,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string, return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName) } } - metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc() + metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc() return &partitionInfo{ partitionID: partInfo.partitionID, createdTimestamp: partInfo.createdTimestamp, diff --git a/internal/proxy/mock_channels_mgr_test.go b/internal/proxy/mock_channels_mgr_test.go index 44ce36e9db1c172df86b4ab98dce59ae827eceef..204943a728f41446e10e21f4742775384beab97f 100644 --- a/internal/proxy/mock_channels_mgr_test.go +++ b/internal/proxy/mock_channels_mgr_test.go @@ -16,11 +16,10 @@ func (m *mockChannelsMgr) getVChannels(collectionID UniqueID) ([]vChan, error) { return nil, nil } -func (m *mockChannelsMgr) removeDMLStream(collectionID UniqueID) error { +func (m *mockChannelsMgr) removeDMLStream(collectionID UniqueID) { if m.removeDMLStreamFuncType != nil { - return m.removeDMLStreamFuncType(collectionID) + m.removeDMLStreamFuncType(collectionID) } - return nil } func newMockChannelsMgr() *mockChannelsMgr { diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 14565fef6610a026fc8e06bcade0f7c7c64d011d..7e7381a0556d5e37174633f15bb94f58154614f0 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -414,7 +414,7 @@ func (node *Proxy) Stop() error { } if node.chMgr != nil { - _ = node.chMgr.removeAllDMLStream() + node.chMgr.removeAllDMLStream() } // https://github.com/milvus-io/milvus/issues/12282 diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 993193a6792dcc83580f9b96f1aebd2278171183..0d2fbfd7973cdf9d6ebca189f0977e58eb2dd2d0 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -21,26 +21,21 @@ import ( "errors" "fmt" "math" - "strconv" - "github.com/golang/protobuf/proto" "go.uber.org/zap" + "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/common" "github.com/milvus-io/milvus/internal/log" - "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/mq/msgstream" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/proto/planpb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/timerecord" - "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -1667,306 +1662,6 @@ func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error { return nil } -type BaseDeleteTask = msgstream.DeleteMsg - -type deleteTask struct { - Condition - BaseDeleteTask - ctx context.Context - deleteExpr string - //req *milvuspb.DeleteRequest - result *milvuspb.MutationResult - chMgr channelsMgr - chTicker channelsTimeTicker - vChannels []vChan - pChannels []pChan - - collectionID UniqueID - schema *schemapb.CollectionSchema -} - -func (dt *deleteTask) TraceCtx() context.Context { - return dt.ctx -} - -func (dt *deleteTask) ID() UniqueID { - return dt.Base.MsgID -} - -func (dt *deleteTask) SetID(uid UniqueID) { - dt.Base.MsgID = uid -} - -func (dt *deleteTask) Type() commonpb.MsgType { - return dt.Base.MsgType -} - -func (dt *deleteTask) Name() string { - return deleteTaskName -} - -func (dt *deleteTask) BeginTs() Timestamp { - return dt.Base.Timestamp -} - -func (dt *deleteTask) EndTs() Timestamp { - return dt.Base.Timestamp -} - -func (dt *deleteTask) SetTs(ts Timestamp) { - dt.Base.Timestamp = ts -} - -func (dt *deleteTask) OnEnqueue() error { - dt.DeleteRequest.Base = &commonpb.MsgBase{} - return nil -} - -func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) { - ret := make(map[pChan]pChanStatistics) - - channels, err := dt.getChannels() - if err != nil { - return ret, err - } - - beginTs := dt.BeginTs() - endTs := dt.EndTs() - - for _, channel := range channels { - ret[channel] = pChanStatistics{ - minTs: beginTs, - maxTs: endTs, - } - } - return ret, nil -} - -func (dt *deleteTask) getChannels() ([]pChan, error) { - collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName) - if err != nil { - return nil, err - } - return dt.chMgr.getChannels(collID) -} - -func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res *schemapb.IDs, rowNum int64, err error) { - if len(expr) == 0 { - log.Warn("empty expr") - return - } - - plan, err := createExprPlan(schema, expr) - if err != nil { - return res, 0, fmt.Errorf("failed to create expr plan, expr = %s", expr) - } - - // delete request only support expr "id in [a, b]" - termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr) - if !ok { - return res, 0, fmt.Errorf("invalid plan node type, only pk in [1, 2] supported") - } - - res = &schemapb.IDs{} - rowNum = int64(len(termExpr.TermExpr.Values)) - switch termExpr.TermExpr.ColumnInfo.GetDataType() { - case schemapb.DataType_Int64: - ids := make([]int64, 0) - for _, v := range termExpr.TermExpr.Values { - ids = append(ids, v.GetInt64Val()) - } - res.IdField = &schemapb.IDs_IntId{ - IntId: &schemapb.LongArray{ - Data: ids, - }, - } - case schemapb.DataType_VarChar: - ids := make([]string, 0) - for _, v := range termExpr.TermExpr.Values { - ids = append(ids, v.GetStringVal()) - } - res.IdField = &schemapb.IDs_StrId{ - StrId: &schemapb.StringArray{ - Data: ids, - }, - } - default: - return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr") - } - - return res, rowNum, nil -} - -func (dt *deleteTask) PreExecute(ctx context.Context) error { - dt.Base.MsgType = commonpb.MsgType_Delete - dt.Base.SourceID = Params.ProxyCfg.GetNodeID() - - dt.result = &milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, - IDs: &schemapb.IDs{ - IdField: nil, - }, - Timestamp: dt.BeginTs(), - } - - collName := dt.CollectionName - if err := validateCollectionName(collName); err != nil { - log.Error("Invalid collection name", zap.String("collectionName", collName)) - return err - } - collID, err := globalMetaCache.GetCollectionID(ctx, collName) - if err != nil { - log.Debug("Failed to get collection id", zap.String("collectionName", collName)) - return err - } - dt.DeleteRequest.CollectionID = collID - dt.collectionID = collID - - // If partitionName is not empty, partitionID will be set. - if len(dt.PartitionName) > 0 { - partName := dt.PartitionName - if err := validatePartitionTag(partName, true); err != nil { - log.Error("Invalid partition name", zap.String("partitionName", partName)) - return err - } - partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName) - if err != nil { - log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName)) - return err - } - dt.DeleteRequest.PartitionID = partID - } else { - dt.DeleteRequest.PartitionID = common.InvalidPartitionID - } - - schema, err := globalMetaCache.GetCollectionSchema(ctx, collName) - if err != nil { - log.Error("Failed to get collection schema", zap.String("collectionName", collName)) - return err - } - dt.schema = schema - - // get delete.primaryKeys from delete expr - primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr) - if err != nil { - log.Error("Failed to get primary keys from expr", zap.Error(err)) - return err - } - - dt.DeleteRequest.NumRows = numRow - dt.DeleteRequest.PrimaryKeys = primaryKeys - log.Debug("get primary keys from expr", zap.Int64("len of primary keys", dt.DeleteRequest.NumRows)) - - // set result - dt.result.IDs = primaryKeys - dt.result.DeleteCnt = dt.DeleteRequest.NumRows - - dt.Timestamps = make([]uint64, numRow) - for index := range dt.Timestamps { - dt.Timestamps[index] = dt.BeginTs() - } - - return nil -} - -func (dt *deleteTask) Execute(ctx context.Context) (err error) { - sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute") - defer sp.Finish() - - tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID())) - - collID := dt.DeleteRequest.CollectionID - stream, err := dt.chMgr.getOrCreateDmlStream(collID) - if err != nil { - return err - } - - // hash primary keys to channels - channelNames, err := dt.chMgr.getVChannels(collID) - if err != nil { - log.Error("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) - dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - dt.result.Status.Reason = err.Error() - return err - } - dt.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames) - - log.Info("send delete request to virtual channels", - zap.String("collection", dt.GetCollectionName()), - zap.Int64("collection_id", collID), - zap.Strings("virtual_channels", channelNames), - zap.Int64("task_id", dt.ID())) - - tr.Record("get vchannels") - // repack delete msg by dmChannel - result := make(map[uint32]msgstream.TsMsg) - collectionName := dt.CollectionName - collectionID := dt.CollectionID - partitionID := dt.PartitionID - partitionName := dt.PartitionName - proxyID := dt.Base.SourceID - for index, key := range dt.HashValues { - ts := dt.Timestamps[index] - _, ok := result[key] - if !ok { - sliceRequest := internalpb.DeleteRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Delete, - MsgID: dt.Base.MsgID, - Timestamp: ts, - SourceID: proxyID, - }, - CollectionID: collectionID, - PartitionID: partitionID, - CollectionName: collectionName, - PartitionName: partitionName, - PrimaryKeys: &schemapb.IDs{}, - } - deleteMsg := &msgstream.DeleteMsg{ - BaseMsg: msgstream.BaseMsg{ - Ctx: ctx, - }, - DeleteRequest: sliceRequest, - } - result[key] = deleteMsg - } - curMsg := result[key].(*msgstream.DeleteMsg) - curMsg.HashValues = append(curMsg.HashValues, dt.HashValues[index]) - curMsg.Timestamps = append(curMsg.Timestamps, dt.Timestamps[index]) - typeutil.AppendIDs(curMsg.PrimaryKeys, dt.PrimaryKeys, index) - curMsg.NumRows++ - } - - // send delete request to log broker - msgPack := &msgstream.MsgPack{ - BeginTs: dt.BeginTs(), - EndTs: dt.EndTs(), - } - for _, msg := range result { - if msg != nil { - msgPack.Msgs = append(msgPack.Msgs, msg) - } - } - - tr.Record("pack messages") - err = stream.Produce(msgPack) - if err != nil { - dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - dt.result.Status.Reason = err.Error() - return err - } - sendMsgDur := tr.Record("send delete request to dml channels") - metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds())) - - return nil -} - -func (dt *deleteTask) PostExecute(ctx context.Context) error { - return nil -} - // CreateAliasTask contains task information of CreateAlias type CreateAliasTask struct { Condition diff --git a/internal/proxy/test_delete.go b/internal/proxy/test_delete.go new file mode 100644 index 0000000000000000000000000000000000000000..717e5f5baba596abefb5deb5a83293c9bf2ead08 --- /dev/null +++ b/internal/proxy/test_delete.go @@ -0,0 +1,322 @@ +package proxy + +import ( + "context" + "fmt" + "strconv" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/commonpb" + "github.com/milvus-io/milvus-proto/go-api/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/schemapb" + "github.com/milvus-io/milvus/internal/common" + "github.com/milvus-io/milvus/internal/log" + "github.com/milvus-io/milvus/internal/metrics" + "github.com/milvus-io/milvus/internal/mq/msgstream" + "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/proto/planpb" + "github.com/milvus-io/milvus/internal/util/timerecord" + "github.com/milvus-io/milvus/internal/util/trace" + "github.com/milvus-io/milvus/internal/util/typeutil" +) + +type BaseDeleteTask = msgstream.DeleteMsg + +type deleteTask struct { + Condition + BaseDeleteTask + ctx context.Context + deleteExpr string + //req *milvuspb.DeleteRequest + result *milvuspb.MutationResult + chMgr channelsMgr + chTicker channelsTimeTicker + vChannels []vChan + pChannels []pChan + + collectionID UniqueID + schema *schemapb.CollectionSchema +} + +func (dt *deleteTask) TraceCtx() context.Context { + return dt.ctx +} + +func (dt *deleteTask) ID() UniqueID { + return dt.Base.MsgID +} + +func (dt *deleteTask) SetID(uid UniqueID) { + dt.Base.MsgID = uid +} + +func (dt *deleteTask) Type() commonpb.MsgType { + return dt.Base.MsgType +} + +func (dt *deleteTask) Name() string { + return deleteTaskName +} + +func (dt *deleteTask) BeginTs() Timestamp { + return dt.Base.Timestamp +} + +func (dt *deleteTask) EndTs() Timestamp { + return dt.Base.Timestamp +} + +func (dt *deleteTask) SetTs(ts Timestamp) { + dt.Base.Timestamp = ts +} + +func (dt *deleteTask) OnEnqueue() error { + dt.DeleteRequest.Base = &commonpb.MsgBase{} + return nil +} + +func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) { + ret := make(map[pChan]pChanStatistics) + + channels, err := dt.getChannels() + if err != nil { + return ret, err + } + + beginTs := dt.BeginTs() + endTs := dt.EndTs() + + for _, channel := range channels { + ret[channel] = pChanStatistics{ + minTs: beginTs, + maxTs: endTs, + } + } + return ret, nil +} + +func (dt *deleteTask) getChannels() ([]pChan, error) { + collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName) + if err != nil { + return nil, err + } + return dt.chMgr.getChannels(collID) +} + +func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res *schemapb.IDs, rowNum int64, err error) { + if len(expr) == 0 { + log.Warn("empty expr") + return + } + + plan, err := createExprPlan(schema, expr) + if err != nil { + return res, 0, fmt.Errorf("failed to create expr plan, expr = %s", expr) + } + + // delete request only support expr "id in [a, b]" + termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr) + if !ok { + return res, 0, fmt.Errorf("invalid plan node type, only pk in [1, 2] supported") + } + + res = &schemapb.IDs{} + rowNum = int64(len(termExpr.TermExpr.Values)) + switch termExpr.TermExpr.ColumnInfo.GetDataType() { + case schemapb.DataType_Int64: + ids := make([]int64, 0) + for _, v := range termExpr.TermExpr.Values { + ids = append(ids, v.GetInt64Val()) + } + res.IdField = &schemapb.IDs_IntId{ + IntId: &schemapb.LongArray{ + Data: ids, + }, + } + case schemapb.DataType_VarChar: + ids := make([]string, 0) + for _, v := range termExpr.TermExpr.Values { + ids = append(ids, v.GetStringVal()) + } + res.IdField = &schemapb.IDs_StrId{ + StrId: &schemapb.StringArray{ + Data: ids, + }, + } + default: + return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr") + } + + return res, rowNum, nil +} + +func (dt *deleteTask) PreExecute(ctx context.Context) error { + dt.Base.MsgType = commonpb.MsgType_Delete + dt.Base.SourceID = Params.ProxyCfg.GetNodeID() + + dt.result = &milvuspb.MutationResult{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + IDs: &schemapb.IDs{ + IdField: nil, + }, + Timestamp: dt.BeginTs(), + } + + collName := dt.CollectionName + if err := validateCollectionName(collName); err != nil { + log.Error("Invalid collection name", zap.String("collectionName", collName)) + return err + } + collID, err := globalMetaCache.GetCollectionID(ctx, collName) + if err != nil { + log.Debug("Failed to get collection id", zap.String("collectionName", collName)) + return err + } + dt.DeleteRequest.CollectionID = collID + dt.collectionID = collID + + // If partitionName is not empty, partitionID will be set. + if len(dt.PartitionName) > 0 { + partName := dt.PartitionName + if err := validatePartitionTag(partName, true); err != nil { + log.Error("Invalid partition name", zap.String("partitionName", partName)) + return err + } + partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName) + if err != nil { + log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName)) + return err + } + dt.DeleteRequest.PartitionID = partID + } else { + dt.DeleteRequest.PartitionID = common.InvalidPartitionID + } + + schema, err := globalMetaCache.GetCollectionSchema(ctx, collName) + if err != nil { + log.Error("Failed to get collection schema", zap.String("collectionName", collName)) + return err + } + dt.schema = schema + + // get delete.primaryKeys from delete expr + primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr) + if err != nil { + log.Error("Failed to get primary keys from expr", zap.Error(err)) + return err + } + + dt.DeleteRequest.NumRows = numRow + dt.DeleteRequest.PrimaryKeys = primaryKeys + log.Debug("get primary keys from expr", zap.Int64("len of primary keys", dt.DeleteRequest.NumRows)) + + // set result + dt.result.IDs = primaryKeys + dt.result.DeleteCnt = dt.DeleteRequest.NumRows + + dt.Timestamps = make([]uint64, numRow) + for index := range dt.Timestamps { + dt.Timestamps[index] = dt.BeginTs() + } + + return nil +} + +func (dt *deleteTask) Execute(ctx context.Context) (err error) { + sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute") + defer sp.Finish() + + tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID())) + + collID := dt.DeleteRequest.CollectionID + stream, err := dt.chMgr.getOrCreateDmlStream(collID) + if err != nil { + return err + } + + // hash primary keys to channels + channelNames, err := dt.chMgr.getVChannels(collID) + if err != nil { + log.Error("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) + dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + dt.result.Status.Reason = err.Error() + return err + } + dt.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames) + + log.Info("send delete request to virtual channels", + zap.String("collection", dt.GetCollectionName()), + zap.Int64("collection_id", collID), + zap.Strings("virtual_channels", channelNames), + zap.Int64("task_id", dt.ID())) + + tr.Record("get vchannels") + // repack delete msg by dmChannel + result := make(map[uint32]msgstream.TsMsg) + collectionName := dt.CollectionName + collectionID := dt.CollectionID + partitionID := dt.PartitionID + partitionName := dt.PartitionName + proxyID := dt.Base.SourceID + for index, key := range dt.HashValues { + ts := dt.Timestamps[index] + _, ok := result[key] + if !ok { + sliceRequest := internalpb.DeleteRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Delete, + MsgID: dt.Base.MsgID, + Timestamp: ts, + SourceID: proxyID, + }, + CollectionID: collectionID, + PartitionID: partitionID, + CollectionName: collectionName, + PartitionName: partitionName, + PrimaryKeys: &schemapb.IDs{}, + } + deleteMsg := &msgstream.DeleteMsg{ + BaseMsg: msgstream.BaseMsg{ + Ctx: ctx, + }, + DeleteRequest: sliceRequest, + } + result[key] = deleteMsg + } + curMsg := result[key].(*msgstream.DeleteMsg) + curMsg.HashValues = append(curMsg.HashValues, dt.HashValues[index]) + curMsg.Timestamps = append(curMsg.Timestamps, dt.Timestamps[index]) + typeutil.AppendIDs(curMsg.PrimaryKeys, dt.PrimaryKeys, index) + curMsg.NumRows++ + } + + // send delete request to log broker + msgPack := &msgstream.MsgPack{ + BeginTs: dt.BeginTs(), + EndTs: dt.EndTs(), + } + for _, msg := range result { + if msg != nil { + msgPack.Msgs = append(msgPack.Msgs, msg) + } + } + + tr.Record("pack messages") + err = stream.Produce(msgPack) + if err != nil { + dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + dt.result.Status.Reason = err.Error() + return err + } + sendMsgDur := tr.Record("send delete request to dml channels") + metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds())) + + return nil +} + +func (dt *deleteTask) PostExecute(ctx context.Context) error { + return nil +}