未验证 提交 5b9988b6 编写于 作者: X Xiaofan 提交者: GitHub

Support collection level monitoring metrics (#19686)

Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
Signed-off-by: Nxiaofan-luan <xiaofan.luan@zilliz.com>
上级 4cc57adc
......@@ -62,6 +62,7 @@ const (
channelNameLabelName = "channel_name"
functionLabelName = "function_name"
queryTypeLabelName = "query_type"
collectionName = "collection_name"
segmentStateLabelName = "segment_state"
usernameLabelName = "username"
rolenameLabelName = "role_name"
......
......@@ -47,16 +47,45 @@ var (
Help: "counter of vectors successfully inserted",
}, []string{nodeIDLabelName})
// ProxySearchLatency record the latency of search successfully.
ProxySearchLatency = prometheus.NewHistogramVec(
// ProxySQLatency record the latency of search successfully.
ProxySQLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "sq_latency",
Help: "latency of search",
Help: "latency of search or query successfully",
Buckets: buckets,
}, []string{nodeIDLabelName, queryTypeLabelName})
// ProxyCollectionSQLatency record the latency of search successfully, per collection
ProxyCollectionSQLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "collection_sq_latency",
Help: "latency of search or query successfully, per collection",
Buckets: buckets,
}, []string{nodeIDLabelName, queryTypeLabelName, collectionName})
// ProxyMutationLatency record the latency that mutate successfully.
ProxyMutationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "mutation_latency",
Help: "latency of insert or delete successfully",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, msgTypeLabelName})
// ProxyMutationLatency record the latency that mutate successfully, per collection
ProxyCollectionMutationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "collection_mutation_latency",
Help: "latency of insert or delete successfully, per collection",
Buckets: buckets,
}, []string{nodeIDLabelName, msgTypeLabelName, collectionName})
// ProxyWaitForSearchResultLatency record the time that the proxy waits for the search result.
ProxyWaitForSearchResultLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
......@@ -66,7 +95,6 @@ var (
Help: "latency that proxy waits for the result",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, queryTypeLabelName})
// ProxyReduceResultLatency record the time that the proxy reduces search result.
ProxyReduceResultLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
......@@ -96,16 +124,6 @@ var (
Help: "number of MsgStream objects per physical channel",
}, []string{nodeIDLabelName, channelNameLabelName})
// ProxyMutationLatency record the latency that insert successfully.
ProxyMutationLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "mutation_latency",
Help: "latency of insert or delete successfully",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, msgTypeLabelName})
// ProxySendMutationReqLatency record the latency that Proxy send insert request to MsgStream.
ProxySendMutationReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
......@@ -117,12 +135,12 @@ var (
}, []string{nodeIDLabelName, msgTypeLabelName})
// ProxyCacheHitCounter record the number of Proxy cache hits or miss.
ProxyCacheHitCounter = prometheus.NewCounterVec(
ProxyCacheStatsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "cache_hit_count",
Help: "count of cache hits",
Help: "count of cache hits/miss",
}, []string{nodeIDLabelName, cacheNameLabelName, cacheStateLabelName})
// ProxyUpdateCacheLatency record the time that proxy update cache when cache miss.
......@@ -164,60 +182,22 @@ var (
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName})
// ProxyDDLFunctionCall records the number of times the function of the DDL operation was executed, like `CreateCollection`.
ProxyDDLFunctionCall = prometheus.NewCounterVec(
// ProxyFunctionCall records the number of times the function of the DDL operation was executed, like `CreateCollection`.
ProxyFunctionCall = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "ddl_req_count",
Help: "count of DDL operation executed",
Name: "req_count",
Help: "count of operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDQLFunctionCall records the number of times the function of the DQL operation was executed, like `HasCollection`.
ProxyDQLFunctionCall = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "dql_req_count",
Help: "count of DQL operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDMLFunctionCall records the number of times the function of the DML operation was executed, like `LoadCollection`.
ProxyDMLFunctionCall = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "dml_req_count",
Help: "count of DML operation executed",
}, []string{nodeIDLabelName, functionLabelName, statusLabelName})
// ProxyDDLReqLatency records the latency that for DML request, like "CreateCollection".
ProxyDDLReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "ddl_req_latency",
Help: "latency of each DDL request",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
// ProxyDMLReqLatency records the latency that for DML request.
ProxyDMLReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "dml_req_latency",
Help: "latency of each DML request excluding insert and delete",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
// ProxyDQLReqLatency record the latency that for DQL request, like "HasCollection".
ProxyDQLReqLatency = prometheus.NewHistogramVec(
// ProxyReqLatency records the latency that for all requests, like "CreateCollection".
ProxyReqLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.ProxyRole,
Name: "dql_req_latency",
Help: "latency of each DQL request excluding search and query",
Name: "req_latency",
Help: "latency of each request",
Buckets: buckets, // unit: ms
}, []string{nodeIDLabelName, functionLabelName})
......@@ -254,29 +234,29 @@ func RegisterProxy(registry *prometheus.Registry) {
registry.MustRegister(ProxySearchVectors)
registry.MustRegister(ProxyInsertVectors)
registry.MustRegister(ProxySearchLatency)
registry.MustRegister(ProxySQLatency)
registry.MustRegister(ProxyCollectionSQLatency)
registry.MustRegister(ProxyMutationLatency)
registry.MustRegister(ProxyCollectionMutationLatency)
registry.MustRegister(ProxyWaitForSearchResultLatency)
registry.MustRegister(ProxyReduceResultLatency)
registry.MustRegister(ProxyDecodeResultLatency)
registry.MustRegister(ProxyMsgStreamObjectsForPChan)
registry.MustRegister(ProxyMutationLatency)
registry.MustRegister(ProxySendMutationReqLatency)
registry.MustRegister(ProxyCacheHitCounter)
registry.MustRegister(ProxyCacheStatsCounter)
registry.MustRegister(ProxyUpdateCacheLatency)
registry.MustRegister(ProxySyncTimeTick)
registry.MustRegister(ProxyApplyPrimaryKeyLatency)
registry.MustRegister(ProxyApplyTimestampLatency)
registry.MustRegister(ProxyDDLFunctionCall)
registry.MustRegister(ProxyDQLFunctionCall)
registry.MustRegister(ProxyDMLFunctionCall)
registry.MustRegister(ProxyDDLReqLatency)
registry.MustRegister(ProxyDMLReqLatency)
registry.MustRegister(ProxyDQLReqLatency)
registry.MustRegister(ProxyFunctionCall)
registry.MustRegister(ProxyReqLatency)
registry.MustRegister(ProxyReceiveBytes)
registry.MustRegister(ProxyReadReqSendBytes)
......@@ -301,3 +281,14 @@ func SetRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, rate flo
ProxyLimiterRate.WithLabelValues(nodeIDStr, QueryLabel).Set(rate)
}
}
func CleanupCollectionMetrics(nodeID int64, collection string) {
ProxyCollectionSQLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10),
queryTypeLabelName: SearchLabel, collectionName: collection})
ProxyCollectionSQLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10),
queryTypeLabelName: QueryLabel, collectionName: collection})
ProxyCollectionMutationLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10),
msgTypeLabelName: InsertLabel, collectionName: collection})
ProxyCollectionMutationLatency.Delete(prometheus.Labels{nodeIDLabelName: strconv.FormatInt(nodeID, 10),
msgTypeLabelName: DeleteLabel, collectionName: collection})
}
......@@ -39,8 +39,8 @@ type channelsMgr interface {
getChannels(collectionID UniqueID) ([]pChan, error)
getVChannels(collectionID UniqueID) ([]vChan, error)
getOrCreateDmlStream(collectionID UniqueID) (msgstream.MsgStream, error)
removeDMLStream(collectionID UniqueID) error
removeAllDMLStream() error
removeDMLStream(collectionID UniqueID)
removeAllDMLStream()
}
type channelInfos struct {
......@@ -279,7 +279,7 @@ func (mgr *singleTypeChannelsMgr) getOrCreateStream(collectionID UniqueID) (msgs
// removeStream remove the corresponding stream of the specified collection. Idempotent.
// If stream already exists, remove it, otherwise do nothing.
func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) error {
func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
if info, ok := mgr.infos[collectionID]; ok {
......@@ -288,11 +288,10 @@ func (mgr *singleTypeChannelsMgr) removeStream(collectionID UniqueID) error {
delete(mgr.infos, collectionID)
}
log.Info("dml stream removed", zap.Int64("collection_id", collectionID))
return nil
}
// removeAllStream remove all message stream.
func (mgr *singleTypeChannelsMgr) removeAllStream() error {
func (mgr *singleTypeChannelsMgr) removeAllStream() {
mgr.mu.Lock()
defer mgr.mu.Unlock()
for _, info := range mgr.infos {
......@@ -301,7 +300,6 @@ func (mgr *singleTypeChannelsMgr) removeAllStream() error {
}
mgr.infos = make(map[UniqueID]streamInfos)
log.Info("all dml stream removed")
return nil
}
func newSingleTypeChannelsMgr(
......@@ -339,12 +337,12 @@ func (mgr *channelsMgrImpl) getOrCreateDmlStream(collectionID UniqueID) (msgstre
return mgr.dmlChannelsMgr.getOrCreateStream(collectionID)
}
func (mgr *channelsMgrImpl) removeDMLStream(collectionID UniqueID) error {
return mgr.dmlChannelsMgr.removeStream(collectionID)
func (mgr *channelsMgrImpl) removeDMLStream(collectionID UniqueID) {
mgr.dmlChannelsMgr.removeStream(collectionID)
}
func (mgr *channelsMgrImpl) removeAllDMLStream() error {
return mgr.dmlChannelsMgr.removeAllStream()
func (mgr *channelsMgrImpl) removeAllDMLStream() {
mgr.dmlChannelsMgr.removeAllStream()
}
// newChannelsMgrImpl constructs a channels manager.
......
......@@ -370,9 +370,8 @@ func Test_singleTypeChannelsMgr_removeStream(t *testing.T) {
},
},
}
err := m.removeStream(100)
assert.NoError(t, err)
_, err = m.lockGetStream(100)
m.removeStream(100)
_, err := m.lockGetStream(100)
assert.Error(t, err)
}
......@@ -384,8 +383,7 @@ func Test_singleTypeChannelsMgr_removeAllStream(t *testing.T) {
},
},
}
err := m.removeAllStream()
assert.NoError(t, err)
_, err = m.lockGetStream(100)
m.removeAllStream()
_, err := m.lockGetStream(100)
assert.Error(t, err)
}
此差异已折叠。
......@@ -163,7 +163,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string)
collInfo, ok := m.collInfo[collectionName]
if !ok {
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GeCollectionID", metrics.CacheMissLabel).Inc()
tr := timerecord.NewTimeRecorder("UpdateCache")
m.mu.RUnlock()
coll, err := m.describeCollection(ctx, collectionName)
......@@ -178,7 +178,7 @@ func (m *MetaCache) GetCollectionID(ctx context.Context, collectionName string)
return collInfo.collID, nil
}
defer m.mu.RUnlock()
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionID", metrics.CacheHitLabel).Inc()
return collInfo.collID, nil
}
......@@ -193,7 +193,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string
if !ok {
tr := timerecord.NewTimeRecorder("UpdateCache")
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheMissLabel).Inc()
coll, err := m.describeCollection(ctx, collectionName)
if err != nil {
return nil, err
......@@ -239,7 +239,7 @@ func (m *MetaCache) GetCollectionInfo(ctx context.Context, collectionName string
}
}
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionInfo", metrics.CacheHitLabel).Inc()
return collInfo, nil
}
......@@ -248,7 +248,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri
collInfo, ok := m.collInfo[collectionName]
if !ok {
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheMissLabel).Inc()
tr := timerecord.NewTimeRecorder("UpdateCache")
m.mu.RUnlock()
coll, err := m.describeCollection(ctx, collectionName)
......@@ -269,7 +269,7 @@ func (m *MetaCache) GetCollectionSchema(ctx context.Context, collectionName stri
return collInfo.schema, nil
}
defer m.mu.RUnlock()
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetCollectionSchema", metrics.CacheHitLabel).Inc()
return collInfo.schema, nil
}
......@@ -309,7 +309,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m
if collInfo.partInfo == nil || len(collInfo.partInfo) == 0 {
tr := timerecord.NewTimeRecorder("UpdateCache")
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheMissLabel).Inc()
m.mu.RUnlock()
partitions, err := m.showPartitions(ctx, collectionName)
......@@ -335,7 +335,7 @@ func (m *MetaCache) GetPartitions(ctx context.Context, collectionName string) (m
}
defer m.mu.RUnlock()
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitions", metrics.CacheHitLabel).Inc()
ret := make(map[string]typeutil.UniqueID)
partInfo := m.collInfo[collectionName].partInfo
......@@ -366,7 +366,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
if !ok {
tr := timerecord.NewTimeRecorder("UpdateCache")
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheMissLabel).Inc()
partitions, err := m.showPartitions(ctx, collectionName)
if err != nil {
return nil, err
......@@ -385,7 +385,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
}
}
metrics.ProxyCacheHitCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
return &partitionInfo{
partitionID: partInfo.partitionID,
createdTimestamp: partInfo.createdTimestamp,
......
......@@ -16,11 +16,10 @@ func (m *mockChannelsMgr) getVChannels(collectionID UniqueID) ([]vChan, error) {
return nil, nil
}
func (m *mockChannelsMgr) removeDMLStream(collectionID UniqueID) error {
func (m *mockChannelsMgr) removeDMLStream(collectionID UniqueID) {
if m.removeDMLStreamFuncType != nil {
return m.removeDMLStreamFuncType(collectionID)
m.removeDMLStreamFuncType(collectionID)
}
return nil
}
func newMockChannelsMgr() *mockChannelsMgr {
......
......@@ -414,7 +414,7 @@ func (node *Proxy) Stop() error {
}
if node.chMgr != nil {
_ = node.chMgr.removeAllDMLStream()
node.chMgr.removeAllDMLStream()
}
// https://github.com/milvus-io/milvus/issues/12282
......
......@@ -21,26 +21,21 @@ import (
"errors"
"fmt"
"math"
"strconv"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
......@@ -1667,306 +1662,6 @@ func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error {
return nil
}
type BaseDeleteTask = msgstream.DeleteMsg
type deleteTask struct {
Condition
BaseDeleteTask
ctx context.Context
deleteExpr string
//req *milvuspb.DeleteRequest
result *milvuspb.MutationResult
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
collectionID UniqueID
schema *schemapb.CollectionSchema
}
func (dt *deleteTask) TraceCtx() context.Context {
return dt.ctx
}
func (dt *deleteTask) ID() UniqueID {
return dt.Base.MsgID
}
func (dt *deleteTask) SetID(uid UniqueID) {
dt.Base.MsgID = uid
}
func (dt *deleteTask) Type() commonpb.MsgType {
return dt.Base.MsgType
}
func (dt *deleteTask) Name() string {
return deleteTaskName
}
func (dt *deleteTask) BeginTs() Timestamp {
return dt.Base.Timestamp
}
func (dt *deleteTask) EndTs() Timestamp {
return dt.Base.Timestamp
}
func (dt *deleteTask) SetTs(ts Timestamp) {
dt.Base.Timestamp = ts
}
func (dt *deleteTask) OnEnqueue() error {
dt.DeleteRequest.Base = &commonpb.MsgBase{}
return nil
}
func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) {
ret := make(map[pChan]pChanStatistics)
channels, err := dt.getChannels()
if err != nil {
return ret, err
}
beginTs := dt.BeginTs()
endTs := dt.EndTs()
for _, channel := range channels {
ret[channel] = pChanStatistics{
minTs: beginTs,
maxTs: endTs,
}
}
return ret, nil
}
func (dt *deleteTask) getChannels() ([]pChan, error) {
collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName)
if err != nil {
return nil, err
}
return dt.chMgr.getChannels(collID)
}
func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res *schemapb.IDs, rowNum int64, err error) {
if len(expr) == 0 {
log.Warn("empty expr")
return
}
plan, err := createExprPlan(schema, expr)
if err != nil {
return res, 0, fmt.Errorf("failed to create expr plan, expr = %s", expr)
}
// delete request only support expr "id in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr)
if !ok {
return res, 0, fmt.Errorf("invalid plan node type, only pk in [1, 2] supported")
}
res = &schemapb.IDs{}
rowNum = int64(len(termExpr.TermExpr.Values))
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
case schemapb.DataType_Int64:
ids := make([]int64, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetInt64Val())
}
res.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
}
case schemapb.DataType_VarChar:
ids := make([]string, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetStringVal())
}
res.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: ids,
},
}
default:
return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr")
}
return res, rowNum, nil
}
func (dt *deleteTask) PreExecute(ctx context.Context) error {
dt.Base.MsgType = commonpb.MsgType_Delete
dt.Base.SourceID = Params.ProxyCfg.GetNodeID()
dt.result = &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IDs: &schemapb.IDs{
IdField: nil,
},
Timestamp: dt.BeginTs(),
}
collName := dt.CollectionName
if err := validateCollectionName(collName); err != nil {
log.Error("Invalid collection name", zap.String("collectionName", collName))
return err
}
collID, err := globalMetaCache.GetCollectionID(ctx, collName)
if err != nil {
log.Debug("Failed to get collection id", zap.String("collectionName", collName))
return err
}
dt.DeleteRequest.CollectionID = collID
dt.collectionID = collID
// If partitionName is not empty, partitionID will be set.
if len(dt.PartitionName) > 0 {
partName := dt.PartitionName
if err := validatePartitionTag(partName, true); err != nil {
log.Error("Invalid partition name", zap.String("partitionName", partName))
return err
}
partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName)
if err != nil {
log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName))
return err
}
dt.DeleteRequest.PartitionID = partID
} else {
dt.DeleteRequest.PartitionID = common.InvalidPartitionID
}
schema, err := globalMetaCache.GetCollectionSchema(ctx, collName)
if err != nil {
log.Error("Failed to get collection schema", zap.String("collectionName", collName))
return err
}
dt.schema = schema
// get delete.primaryKeys from delete expr
primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr)
if err != nil {
log.Error("Failed to get primary keys from expr", zap.Error(err))
return err
}
dt.DeleteRequest.NumRows = numRow
dt.DeleteRequest.PrimaryKeys = primaryKeys
log.Debug("get primary keys from expr", zap.Int64("len of primary keys", dt.DeleteRequest.NumRows))
// set result
dt.result.IDs = primaryKeys
dt.result.DeleteCnt = dt.DeleteRequest.NumRows
dt.Timestamps = make([]uint64, numRow)
for index := range dt.Timestamps {
dt.Timestamps[index] = dt.BeginTs()
}
return nil
}
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute")
defer sp.Finish()
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
collID := dt.DeleteRequest.CollectionID
stream, err := dt.chMgr.getOrCreateDmlStream(collID)
if err != nil {
return err
}
// hash primary keys to channels
channelNames, err := dt.chMgr.getVChannels(collID)
if err != nil {
log.Error("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err))
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
dt.result.Status.Reason = err.Error()
return err
}
dt.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames)
log.Info("send delete request to virtual channels",
zap.String("collection", dt.GetCollectionName()),
zap.Int64("collection_id", collID),
zap.Strings("virtual_channels", channelNames),
zap.Int64("task_id", dt.ID()))
tr.Record("get vchannels")
// repack delete msg by dmChannel
result := make(map[uint32]msgstream.TsMsg)
collectionName := dt.CollectionName
collectionID := dt.CollectionID
partitionID := dt.PartitionID
partitionName := dt.PartitionName
proxyID := dt.Base.SourceID
for index, key := range dt.HashValues {
ts := dt.Timestamps[index]
_, ok := result[key]
if !ok {
sliceRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: dt.Base.MsgID,
Timestamp: ts,
SourceID: proxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
PrimaryKeys: &schemapb.IDs{},
}
deleteMsg := &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: sliceRequest,
}
result[key] = deleteMsg
}
curMsg := result[key].(*msgstream.DeleteMsg)
curMsg.HashValues = append(curMsg.HashValues, dt.HashValues[index])
curMsg.Timestamps = append(curMsg.Timestamps, dt.Timestamps[index])
typeutil.AppendIDs(curMsg.PrimaryKeys, dt.PrimaryKeys, index)
curMsg.NumRows++
}
// send delete request to log broker
msgPack := &msgstream.MsgPack{
BeginTs: dt.BeginTs(),
EndTs: dt.EndTs(),
}
for _, msg := range result {
if msg != nil {
msgPack.Msgs = append(msgPack.Msgs, msg)
}
}
tr.Record("pack messages")
err = stream.Produce(msgPack)
if err != nil {
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
dt.result.Status.Reason = err.Error()
return err
}
sendMsgDur := tr.Record("send delete request to dml channels")
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds()))
return nil
}
func (dt *deleteTask) PostExecute(ctx context.Context) error {
return nil
}
// CreateAliasTask contains task information of CreateAlias
type CreateAliasTask struct {
Condition
......
package proxy
import (
"context"
"fmt"
"strconv"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/planpb"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type BaseDeleteTask = msgstream.DeleteMsg
type deleteTask struct {
Condition
BaseDeleteTask
ctx context.Context
deleteExpr string
//req *milvuspb.DeleteRequest
result *milvuspb.MutationResult
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
collectionID UniqueID
schema *schemapb.CollectionSchema
}
func (dt *deleteTask) TraceCtx() context.Context {
return dt.ctx
}
func (dt *deleteTask) ID() UniqueID {
return dt.Base.MsgID
}
func (dt *deleteTask) SetID(uid UniqueID) {
dt.Base.MsgID = uid
}
func (dt *deleteTask) Type() commonpb.MsgType {
return dt.Base.MsgType
}
func (dt *deleteTask) Name() string {
return deleteTaskName
}
func (dt *deleteTask) BeginTs() Timestamp {
return dt.Base.Timestamp
}
func (dt *deleteTask) EndTs() Timestamp {
return dt.Base.Timestamp
}
func (dt *deleteTask) SetTs(ts Timestamp) {
dt.Base.Timestamp = ts
}
func (dt *deleteTask) OnEnqueue() error {
dt.DeleteRequest.Base = &commonpb.MsgBase{}
return nil
}
func (dt *deleteTask) getPChanStats() (map[pChan]pChanStatistics, error) {
ret := make(map[pChan]pChanStatistics)
channels, err := dt.getChannels()
if err != nil {
return ret, err
}
beginTs := dt.BeginTs()
endTs := dt.EndTs()
for _, channel := range channels {
ret[channel] = pChanStatistics{
minTs: beginTs,
maxTs: endTs,
}
}
return ret, nil
}
func (dt *deleteTask) getChannels() ([]pChan, error) {
collID, err := globalMetaCache.GetCollectionID(dt.ctx, dt.CollectionName)
if err != nil {
return nil, err
}
return dt.chMgr.getChannels(collID)
}
func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res *schemapb.IDs, rowNum int64, err error) {
if len(expr) == 0 {
log.Warn("empty expr")
return
}
plan, err := createExprPlan(schema, expr)
if err != nil {
return res, 0, fmt.Errorf("failed to create expr plan, expr = %s", expr)
}
// delete request only support expr "id in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr)
if !ok {
return res, 0, fmt.Errorf("invalid plan node type, only pk in [1, 2] supported")
}
res = &schemapb.IDs{}
rowNum = int64(len(termExpr.TermExpr.Values))
switch termExpr.TermExpr.ColumnInfo.GetDataType() {
case schemapb.DataType_Int64:
ids := make([]int64, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetInt64Val())
}
res.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: ids,
},
}
case schemapb.DataType_VarChar:
ids := make([]string, 0)
for _, v := range termExpr.TermExpr.Values {
ids = append(ids, v.GetStringVal())
}
res.IdField = &schemapb.IDs_StrId{
StrId: &schemapb.StringArray{
Data: ids,
},
}
default:
return res, 0, fmt.Errorf("invalid field data type specifyed in delete expr")
}
return res, rowNum, nil
}
func (dt *deleteTask) PreExecute(ctx context.Context) error {
dt.Base.MsgType = commonpb.MsgType_Delete
dt.Base.SourceID = Params.ProxyCfg.GetNodeID()
dt.result = &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IDs: &schemapb.IDs{
IdField: nil,
},
Timestamp: dt.BeginTs(),
}
collName := dt.CollectionName
if err := validateCollectionName(collName); err != nil {
log.Error("Invalid collection name", zap.String("collectionName", collName))
return err
}
collID, err := globalMetaCache.GetCollectionID(ctx, collName)
if err != nil {
log.Debug("Failed to get collection id", zap.String("collectionName", collName))
return err
}
dt.DeleteRequest.CollectionID = collID
dt.collectionID = collID
// If partitionName is not empty, partitionID will be set.
if len(dt.PartitionName) > 0 {
partName := dt.PartitionName
if err := validatePartitionTag(partName, true); err != nil {
log.Error("Invalid partition name", zap.String("partitionName", partName))
return err
}
partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName)
if err != nil {
log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName))
return err
}
dt.DeleteRequest.PartitionID = partID
} else {
dt.DeleteRequest.PartitionID = common.InvalidPartitionID
}
schema, err := globalMetaCache.GetCollectionSchema(ctx, collName)
if err != nil {
log.Error("Failed to get collection schema", zap.String("collectionName", collName))
return err
}
dt.schema = schema
// get delete.primaryKeys from delete expr
primaryKeys, numRow, err := getPrimaryKeysFromExpr(schema, dt.deleteExpr)
if err != nil {
log.Error("Failed to get primary keys from expr", zap.Error(err))
return err
}
dt.DeleteRequest.NumRows = numRow
dt.DeleteRequest.PrimaryKeys = primaryKeys
log.Debug("get primary keys from expr", zap.Int64("len of primary keys", dt.DeleteRequest.NumRows))
// set result
dt.result.IDs = primaryKeys
dt.result.DeleteCnt = dt.DeleteRequest.NumRows
dt.Timestamps = make([]uint64, numRow)
for index := range dt.Timestamps {
dt.Timestamps[index] = dt.BeginTs()
}
return nil
}
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute")
defer sp.Finish()
tr := timerecord.NewTimeRecorder(fmt.Sprintf("proxy execute delete %d", dt.ID()))
collID := dt.DeleteRequest.CollectionID
stream, err := dt.chMgr.getOrCreateDmlStream(collID)
if err != nil {
return err
}
// hash primary keys to channels
channelNames, err := dt.chMgr.getVChannels(collID)
if err != nil {
log.Error("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err))
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
dt.result.Status.Reason = err.Error()
return err
}
dt.HashValues = typeutil.HashPK2Channels(dt.result.IDs, channelNames)
log.Info("send delete request to virtual channels",
zap.String("collection", dt.GetCollectionName()),
zap.Int64("collection_id", collID),
zap.Strings("virtual_channels", channelNames),
zap.Int64("task_id", dt.ID()))
tr.Record("get vchannels")
// repack delete msg by dmChannel
result := make(map[uint32]msgstream.TsMsg)
collectionName := dt.CollectionName
collectionID := dt.CollectionID
partitionID := dt.PartitionID
partitionName := dt.PartitionName
proxyID := dt.Base.SourceID
for index, key := range dt.HashValues {
ts := dt.Timestamps[index]
_, ok := result[key]
if !ok {
sliceRequest := internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: dt.Base.MsgID,
Timestamp: ts,
SourceID: proxyID,
},
CollectionID: collectionID,
PartitionID: partitionID,
CollectionName: collectionName,
PartitionName: partitionName,
PrimaryKeys: &schemapb.IDs{},
}
deleteMsg := &msgstream.DeleteMsg{
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
},
DeleteRequest: sliceRequest,
}
result[key] = deleteMsg
}
curMsg := result[key].(*msgstream.DeleteMsg)
curMsg.HashValues = append(curMsg.HashValues, dt.HashValues[index])
curMsg.Timestamps = append(curMsg.Timestamps, dt.Timestamps[index])
typeutil.AppendIDs(curMsg.PrimaryKeys, dt.PrimaryKeys, index)
curMsg.NumRows++
}
// send delete request to log broker
msgPack := &msgstream.MsgPack{
BeginTs: dt.BeginTs(),
EndTs: dt.EndTs(),
}
for _, msg := range result {
if msg != nil {
msgPack.Msgs = append(msgPack.Msgs, msg)
}
}
tr.Record("pack messages")
err = stream.Produce(msgPack)
if err != nil {
dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
dt.result.Status.Reason = err.Error()
return err
}
sendMsgDur := tr.Record("send delete request to dml channels")
metrics.ProxySendMutationReqLatency.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), metrics.DeleteLabel).Observe(float64(sendMsgDur.Milliseconds()))
return nil
}
func (dt *deleteTask) PostExecute(ctx context.Context) error {
return nil
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册