未验证 提交 bd3a8ed3 编写于 作者: C Cai Yudong 提交者: GitHub

Support delete in proxy (#9588)

Signed-off-by: Nyudong.cai <yudong.cai@zilliz.com>
上级 b6b4b784
......@@ -1365,10 +1365,27 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
}, nil
}
deleteReq := &milvuspb.DeleteRequest{
DbName: request.DbName,
CollectionName: request.CollectionName,
PartitionName: request.PartitionName,
Expr: request.Expr,
}
dt := &deleteTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
DeleteRequest: request,
ctx: ctx,
Condition: NewTaskCondition(ctx),
req: deleteReq,
DeleteRequest: &internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
SourceID: Params.ProxyID,
},
CollectionName: request.CollectionName,
PartitionName: request.PartitionName,
},
chMgr: node.chMgr,
chTicker: node.chTicker,
}
log.Debug("Delete request enqueue",
......
......@@ -4591,9 +4591,14 @@ func (rpt *releasePartitionsTask) PostExecute(ctx context.Context) error {
type deleteTask struct {
Condition
*milvuspb.DeleteRequest
ctx context.Context
result *milvuspb.MutationResult
*internalpb.DeleteRequest
ctx context.Context
req *milvuspb.DeleteRequest
result *milvuspb.MutationResult
chMgr channelsMgr
chTicker channelsTimeTicker
vChannels []vChan
pChannels []pChan
}
func (dt *deleteTask) TraceCtx() context.Context {
......@@ -4629,31 +4634,155 @@ func (dt *deleteTask) SetTs(ts Timestamp) {
}
func (dt *deleteTask) OnEnqueue() error {
dt.Base = &commonpb.MsgBase{}
dt.DeleteRequest.Base = &commonpb.MsgBase{}
return nil
}
func getPrimaryKeysFromExpr(schema *schemapb.CollectionSchema, expr string) (res []int64, err error) {
if len(expr) == 0 {
log.Warn("empty expr")
return
}
plan, err := CreateExprPlan(schema, expr)
if err != nil {
return res, fmt.Errorf("failed to create expr plan, expr = %s", expr)
}
// delete request only support expr "id in [a, b]"
termExpr, ok := plan.Node.(*planpb.PlanNode_Predicates).Predicates.Expr.(*planpb.Expr_TermExpr)
if !ok {
return res, fmt.Errorf("invalid plan node type")
}
for _, v := range termExpr.TermExpr.Values {
res = append(res, v.GetInt64Val())
}
return res, nil
}
func (dt *deleteTask) PreExecute(ctx context.Context) error {
dt.Base.MsgType = commonpb.MsgType_Delete
dt.Base.SourceID = Params.ProxyID
collName := dt.CollectionName
dt.result = &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IDs: &schemapb.IDs{
IdField: nil,
},
Timestamp: dt.BeginTs(),
}
collName := dt.req.CollectionName
if err := ValidateCollectionName(collName); err != nil {
log.Error("Invalid collection name", zap.String("collectionName", collName))
return err
}
collID, err := globalMetaCache.GetCollectionID(ctx, collName)
if err != nil {
log.Debug("Failed to get collection id", zap.String("collectionName", collName))
return err
}
dt.DeleteRequest.CollectionID = collID
partitionName := dt.PartitionName
// partitionName is accepted, means delete from whole collection
if partitionName != "" {
if err := ValidatePartitionTag(partitionName, true); err != nil {
if len(dt.req.PartitionName) > 0 {
partName := dt.req.PartitionName
if err := ValidatePartitionTag(partName, true); err != nil {
log.Error("Invalid partition name", zap.String("partitionName", partName))
return err
}
partID, err := globalMetaCache.GetPartitionID(ctx, collName, partName)
if err != nil {
log.Debug("Failed to get partition id", zap.String("collectionName", collName), zap.String("partitionName", partName))
return err
}
dt.DeleteRequest.PartitionID = partID
}
schema, err := globalMetaCache.GetCollectionSchema(ctx, dt.req.CollectionName)
if err != nil {
log.Error("Failed to get collection schema", zap.String("collectionName", dt.req.CollectionName))
return err
}
primaryKeys, err := getPrimaryKeysFromExpr(schema, dt.req.Expr)
if err != nil {
log.Error("Failed to get primary keys from expr", zap.Error(err))
return err
}
log.Debug("get primary keys from expr", zap.Any("primary keys", dt.DeleteRequest.PrimaryKeys))
dt.DeleteRequest.PrimaryKeys = primaryKeys
// set result
dt.result.IDs.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: primaryKeys,
},
}
dt.result.DeleteCnt = int64(len(primaryKeys))
dt.DeleteRequest.Timestamp = dt.BeginTs()
return nil
}
func (dt *deleteTask) Execute(ctx context.Context) (err error) {
sp, ctx := trace.StartSpanFromContextWithOperationName(dt.ctx, "Proxy-Delete-Execute")
defer sp.Finish()
var tsMsg msgstream.TsMsg = &msgstream.DeleteMsg{
DeleteRequest: *dt.DeleteRequest,
BaseMsg: msgstream.BaseMsg{
Ctx: ctx,
HashValues: []uint32{uint32(Params.ProxyID)},
BeginTimestamp: dt.BeginTs(),
EndTimestamp: dt.EndTs(),
},
}
msgPack := msgstream.MsgPack{
BeginTs: dt.BeginTs(),
EndTs: dt.EndTs(),
Msgs: make([]msgstream.TsMsg, 1),
}
msgPack.Msgs[0] = tsMsg
//collID := dt.DeleteRequest.CollectionID
//stream, err := dt.chMgr.getDMLStream(collID)
//if err != nil {
// err = dt.chMgr.createDMLMsgStream(collID)
// if err != nil {
// dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// dt.result.Status.Reason = err.Error()
// return err
// }
// channels, err := dt.chMgr.getChannels(collID)
// if err == nil {
// for _, pchan := range channels {
// err := dt.chTicker.addPChan(pchan)
// if err != nil {
// log.Warn("failed to add pchan to channels time ticker",
// zap.Error(err),
// zap.String("pchan", pchan))
// }
// }
// }
// stream, err = dt.chMgr.getDMLStream(collID)
// if err != nil {
// dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// dt.result.Status.Reason = err.Error()
// return err
// }
//}
//
//err = stream.Produce(&msgPack)
//if err != nil {
// dt.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// dt.result.Status.Reason = err.Error()
// return err
//}
return nil
}
......
......@@ -3020,7 +3020,7 @@ func TestQueryTask_all(t *testing.T) {
wg.Wait()
}
func TestInsertTask_all(t *testing.T) {
func TestTask_all(t *testing.T) {
var err error
Params.Init()
......@@ -3036,7 +3036,7 @@ func TestInsertTask_all(t *testing.T) {
assert.NoError(t, err)
shardsNum := int32(2)
prefix := "TestQueryTask_all"
prefix := "TestTask_all"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
partitionName := prefix + funcutil.GenRandomStr()
......@@ -3051,42 +3051,44 @@ func TestInsertTask_all(t *testing.T) {
dim := 128
nb := 10
schema := constructCollectionSchemaWithAllType(
boolField, int32Field, int64Field, floatField, doubleField,
floatVecField, binaryVecField, dim, collectionName)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
t.Run("create collection", func(t *testing.T) {
schema := constructCollectionSchemaWithAllType(
boolField, int32Field, int64Field, floatField, doubleField,
floatVecField, binaryVecField, dim, collectionName)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)
createColT := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
createColT := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
assert.NoError(t, createColT.OnEnqueue())
assert.NoError(t, createColT.PreExecute(ctx))
assert.NoError(t, createColT.Execute(ctx))
assert.NoError(t, createColT.PostExecute(ctx))
assert.NoError(t, createColT.OnEnqueue())
assert.NoError(t, createColT.PreExecute(ctx))
assert.NoError(t, createColT.Execute(ctx))
assert.NoError(t, createColT.PostExecute(ctx))
_, _ = rc.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
_, _ = rc.CreatePartition(ctx, &milvuspb.CreatePartitionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_CreatePartition,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
})
})
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
......@@ -3122,267 +3124,235 @@ func TestInsertTask_all(t *testing.T) {
_ = segAllocator.Start()
defer segAllocator.Close()
hash := generateHashKeys(nb)
task := &insertTask{
BaseInsertTask: BaseInsertTask{
BaseMsg: msgstream.BaseMsg{
HashValues: hash,
t.Run("insert", func(t *testing.T) {
hash := generateHashKeys(nb)
task := &insertTask{
BaseInsertTask: BaseInsertTask{
BaseMsg: msgstream.BaseMsg{
HashValues: hash,
},
InsertRequest: internalpb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
},
CollectionName: collectionName,
PartitionName: partitionName,
},
},
InsertRequest: internalpb.InsertRequest{
req: &milvuspb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: make([]*schemapb.FieldData, fieldsLen),
HashKeys: hash,
NumRows: uint32(nb),
},
},
req: &milvuspb.InsertRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Insert,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: make([]*schemapb.FieldData, fieldsLen),
HashKeys: hash,
NumRows: uint32(nb),
},
Condition: NewTaskCondition(ctx),
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
Condition: NewTaskCondition(ctx),
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
IDs: nil,
SuccIndex: nil,
ErrIndex: nil,
Acknowledged: false,
InsertCnt: 0,
DeleteCnt: 0,
UpsertCnt: 0,
Timestamp: 0,
},
IDs: nil,
SuccIndex: nil,
ErrIndex: nil,
Acknowledged: false,
InsertCnt: 0,
DeleteCnt: 0,
UpsertCnt: 0,
Timestamp: 0,
},
rowIDAllocator: idAllocator,
segIDAssigner: segAllocator,
chMgr: chMgr,
chTicker: ticker,
vChannels: nil,
pChannels: nil,
schema: nil,
}
task.req.FieldsData[0] = &schemapb.FieldData{
Type: schemapb.DataType_Bool,
FieldName: boolField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: generateBoolArray(nb),
rowIDAllocator: idAllocator,
segIDAssigner: segAllocator,
chMgr: chMgr,
chTicker: ticker,
vChannels: nil,
pChannels: nil,
schema: nil,
}
task.req.FieldsData[0] = &schemapb.FieldData{
Type: schemapb.DataType_Bool,
FieldName: boolField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_BoolData{
BoolData: &schemapb.BoolArray{
Data: generateBoolArray(nb),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 0,
}
FieldId: common.StartOfUserFieldID + 0,
}
task.req.FieldsData[1] = &schemapb.FieldData{
Type: schemapb.DataType_Int32,
FieldName: int32Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: generateInt32Array(nb),
task.req.FieldsData[1] = &schemapb.FieldData{
Type: schemapb.DataType_Int32,
FieldName: int32Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{
Data: generateInt32Array(nb),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 1,
}
FieldId: common.StartOfUserFieldID + 1,
}
task.req.FieldsData[2] = &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: int64Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: generateInt64Array(nb),
task.req.FieldsData[2] = &schemapb.FieldData{
Type: schemapb.DataType_Int64,
FieldName: int64Field,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_LongData{
LongData: &schemapb.LongArray{
Data: generateInt64Array(nb),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 2,
}
FieldId: common.StartOfUserFieldID + 2,
}
task.req.FieldsData[3] = &schemapb.FieldData{
Type: schemapb.DataType_Float,
FieldName: floatField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: generateFloat32Array(nb),
task.req.FieldsData[3] = &schemapb.FieldData{
Type: schemapb.DataType_Float,
FieldName: floatField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_FloatData{
FloatData: &schemapb.FloatArray{
Data: generateFloat32Array(nb),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 3,
}
FieldId: common.StartOfUserFieldID + 3,
}
task.req.FieldsData[4] = &schemapb.FieldData{
Type: schemapb.DataType_Double,
FieldName: doubleField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: generateFloat64Array(nb),
task.req.FieldsData[4] = &schemapb.FieldData{
Type: schemapb.DataType_Double,
FieldName: doubleField,
Field: &schemapb.FieldData_Scalars{
Scalars: &schemapb.ScalarField{
Data: &schemapb.ScalarField_DoubleData{
DoubleData: &schemapb.DoubleArray{
Data: generateFloat64Array(nb),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 4,
}
FieldId: common.StartOfUserFieldID + 4,
}
task.req.FieldsData[5] = &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: generateFloatVectors(nb, dim),
task.req.FieldsData[5] = &schemapb.FieldData{
Type: schemapb.DataType_FloatVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_FloatVector{
FloatVector: &schemapb.FloatArray{
Data: generateFloatVectors(nb, dim),
},
},
},
},
},
FieldId: common.StartOfUserFieldID + 5,
}
FieldId: common.StartOfUserFieldID + 5,
}
task.req.FieldsData[6] = &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: generateBinaryVectors(nb, dim),
task.req.FieldsData[6] = &schemapb.FieldData{
Type: schemapb.DataType_BinaryVector,
FieldName: doubleField,
Field: &schemapb.FieldData_Vectors{
Vectors: &schemapb.VectorField{
Dim: int64(dim),
Data: &schemapb.VectorField_BinaryVector{
BinaryVector: generateBinaryVectors(nb, dim),
},
},
},
},
FieldId: common.StartOfUserFieldID + 6,
}
assert.NoError(t, task.OnEnqueue())
assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}
func TestDeleteTask_all(t *testing.T) {
Params.Init()
ctx := context.Background()
FieldId: common.StartOfUserFieldID + 6,
}
prefix := "TestDeleteTask_all"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
partitionName := prefix + funcutil.GenRandomStr()
assert.NoError(t, task.OnEnqueue())
assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
})
task := &deleteTask{
Condition: NewTaskCondition(ctx),
DeleteRequest: &milvuspb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
t.Run("delete", func(t *testing.T) {
task := &deleteTask{
Condition: NewTaskCondition(ctx),
DeleteRequest: &internalpb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
CollectionName: collectionName,
PartitionName: partitionName,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
Expr: "",
},
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
req: &milvuspb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 0,
Timestamp: 0,
SourceID: Params.ProxyID,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
Expr: "int64 in [0, 1]",
},
IDs: nil,
SuccIndex: nil,
ErrIndex: nil,
Acknowledged: false,
InsertCnt: 0,
DeleteCnt: 0,
UpsertCnt: 0,
Timestamp: 0,
},
}
assert.NoError(t, task.OnEnqueue())
assert.NotNil(t, task.TraceCtx())
id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
task.SetID(id)
assert.Equal(t, id, task.ID())
task.Base.MsgType = commonpb.MsgType_Delete
assert.Equal(t, commonpb.MsgType_Delete, task.Type())
ts := Timestamp(time.Now().UnixNano())
task.SetTs(ts)
assert.Equal(t, ts, task.BeginTs())
assert.Equal(t, ts, task.EndTs())
assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
}
func TestDeleteTask_PreExecute(t *testing.T) {
Params.Init()
ctx := context.Background()
ctx: ctx,
result: &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
IDs: nil,
SuccIndex: nil,
ErrIndex: nil,
Acknowledged: false,
InsertCnt: 0,
DeleteCnt: 0,
UpsertCnt: 0,
Timestamp: 0,
},
chMgr: chMgr,
chTicker: ticker,
}
prefix := "TestDeleteTask_all"
dbName := ""
collectionName := prefix + funcutil.GenRandomStr()
partitionName := prefix + funcutil.GenRandomStr()
assert.NoError(t, task.OnEnqueue())
assert.NotNil(t, task.TraceCtx())
task := &deleteTask{
DeleteRequest: &milvuspb.DeleteRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_Delete,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
Expr: "",
},
}
id := UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt())
task.SetID(id)
assert.Equal(t, id, task.ID())
assert.NoError(t, task.PreExecute(ctx))
task.Base.MsgType = commonpb.MsgType_Delete
assert.Equal(t, commonpb.MsgType_Delete, task.Type())
task.DeleteRequest.CollectionName = "" // empty
assert.Error(t, task.PreExecute(ctx))
task.DeleteRequest.CollectionName = collectionName
ts := Timestamp(time.Now().UnixNano())
task.SetTs(ts)
assert.Equal(t, ts, task.BeginTs())
assert.Equal(t, ts, task.EndTs())
task.DeleteRequest.PartitionName = "" // empty
assert.NoError(t, task.PreExecute(ctx))
task.DeleteRequest.PartitionName = partitionName
assert.NoError(t, task.PreExecute(ctx))
assert.NoError(t, task.Execute(ctx))
assert.NoError(t, task.PostExecute(ctx))
})
}
func TestCreateAlias_all(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册