未验证 提交 17f5e3c7 编写于 作者: C cai.zhang 提交者: GitHub

Drop segment indexes when drop partition (#19022)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 5f02f097
......@@ -163,7 +163,6 @@ func TestIndexCoordClient(t *testing.T) {
req := &indexpb.DropIndexRequest{
CollectionID: 0,
IndexName: "default",
FieldID: 0,
}
resp, err := icc.DropIndex(ctx, req)
assert.NoError(t, err)
......
......@@ -132,7 +132,6 @@ func TestIndexCoordinateServer(t *testing.T) {
req := &indexpb.DropIndexRequest{
CollectionID: 0,
IndexName: "default",
FieldID: 0,
}
resp, err := server.DropIndex(ctx, req)
assert.NoError(t, err)
......
......@@ -487,3 +487,37 @@ func Test_flushSegmentWatcher_prepare_error(t *testing.T) {
assert.ErrorIs(t, err, ErrSegmentNotFound)
})
}
func Test_flushSegmentWatcher_removeFlushedSegment(t *testing.T) {
task := &internalTask{
state: indexTaskDone,
segmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
},
}
t.Run("success", func(t *testing.T) {
fsw := &flushedSegmentWatcher{
kvClient: &mockETCDKV{
removeWithPrefix: func(key string) error {
return nil
},
},
}
err := fsw.removeFlushedSegment(task)
assert.NoError(t, err)
})
t.Run("fail", func(t *testing.T) {
fsw := &flushedSegmentWatcher{
kvClient: &mockETCDKV{
removeWithPrefix: func(key string) error {
return errors.New("error")
},
},
}
err := fsw.removeFlushedSegment(task)
assert.Error(t, err)
})
}
......@@ -22,13 +22,13 @@ import (
"sync"
"time"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"go.uber.org/zap"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
)
......@@ -158,7 +158,9 @@ func (gc *garbageCollector) recycleSegIndexesMeta() {
if _, ok := flushedSegments[segID]; !ok {
log.Debug("segment is already not exist, mark it deleted", zap.Int64("collID", collID),
zap.Int64("segID", segID))
if err := gc.metaTable.MarkSegmentsIndexAsDeleted([]int64{segID}); err != nil {
if err := gc.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
}); err != nil {
continue
}
}
......
......@@ -614,7 +614,7 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
// index tasks.
func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
log.Info("IndexCoord DropIndex", zap.Int64("collectionID", req.CollectionID),
zap.Int64("fieldID", req.FieldID), zap.String("indexName", req.IndexName))
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
return &commonpb.Status{
......@@ -636,15 +636,37 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
for indexID := range indexID2CreateTs {
indexIDs = append(indexIDs, indexID)
}
err := i.metaTable.MarkIndexAsDeleted(req.CollectionID, indexIDs)
if err != nil {
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
return ret, nil
if len(req.GetPartitionIDs()) == 0 {
// drop collection index
err := i.metaTable.MarkIndexAsDeleted(req.CollectionID, indexIDs)
if err != nil {
log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
return ret, nil
}
} else {
err := i.metaTable.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
for _, partitionID := range req.PartitionIDs {
if segIndex.CollectionID == req.CollectionID && segIndex.PartitionID == partitionID {
return true
}
}
return false
})
if err != nil {
log.Error("IndexCoord drop index fail", zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
ret.Reason = err.Error()
return ret, nil
}
}
log.Info("IndexCoord DropIndex success", zap.Int64("collID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.Int64s("indexIDs", indexIDs))
zap.Int64s("partitionIDs", req.PartitionIDs), zap.String("indexName", req.IndexName),
zap.Int64s("indexIDs", indexIDs))
return ret, nil
}
......
......@@ -31,6 +31,7 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/indexnode"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
......@@ -251,8 +252,8 @@ func TestIndexCoord(t *testing.T) {
t.Run("DropIndex", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: nil,
IndexName: indexName,
FieldID: fieldID,
}
resp, err := ic.DropIndex(ctx, req)
assert.NoError(t, err)
......@@ -366,7 +367,6 @@ func TestIndexCoord_UnHealthy(t *testing.T) {
req := &indexpb.DropIndexRequest{
CollectionID: collID,
IndexName: indexName,
FieldID: fieldID,
}
resp, err := ic.DropIndex(ctx, req)
assert.NoError(t, err)
......@@ -398,6 +398,81 @@ func TestIndexCoord_UnHealthy(t *testing.T) {
}
func TestIndexCoord_DropIndex(t *testing.T) {
t.Run("success", func(t *testing.T) {
ic := &IndexCoord{
metaTable: constructMetaTable(&indexcoord.Catalog{
Txn: &mockETCDKV{
multiSave: func(m map[string]string) error {
return nil
},
},
}),
}
ic.UpdateStateCode(internalpb.StateCode_Healthy)
resp, err := ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: []int64{partID},
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
resp, err = ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: []int64{partID},
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
resp, err = ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: nil,
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
resp, err = ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: nil,
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
})
t.Run("fail", func(t *testing.T) {
ic := &IndexCoord{
metaTable: constructMetaTable(&indexcoord.Catalog{
Txn: &mockETCDKV{
multiSave: func(m map[string]string) error {
return errors.New("error")
},
},
}),
}
ic.UpdateStateCode(internalpb.StateCode_Healthy)
resp, err := ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: []int64{partID},
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
resp, err = ic.DropIndex(context.Background(), &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: nil,
IndexName: indexName,
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode())
})
}
// TODO @xiaocai2333: add ut for error occurred.
//func TestIndexCoord_watchNodeLoop(t *testing.T) {
......
......@@ -699,13 +699,16 @@ func (mt *metaTable) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) er
indexes := make([]*model.Index, 0)
for _, indexID := range indexIDs {
index, ok := fieldIndexes[indexID]
if !ok {
if !ok || index.IsDeleted {
continue
}
clonedIndex := model.CloneIndex(index)
clonedIndex.IsDeleted = true
indexes = append(indexes, clonedIndex)
}
if len(indexes) == 0 {
return nil
}
err := mt.alterIndexes(indexes)
if err != nil {
log.Error("IndexCoord metaTable MarkIndexAsDeleted fail", zap.Int64("collID", collID),
......@@ -718,38 +721,32 @@ func (mt *metaTable) MarkIndexAsDeleted(collID UniqueID, indexIDs []UniqueID) er
}
// MarkSegmentsIndexAsDeleted will mark the index on the segment corresponding the buildID as deleted, and recycleUnusedSegIndexes will recycle these tasks.
func (mt *metaTable) MarkSegmentsIndexAsDeleted(segIDs []UniqueID) error {
log.Info("IndexCoord metaTable MarkSegmentsIndexAsDeleted", zap.Int64s("segIDs", segIDs))
func (mt *metaTable) MarkSegmentsIndexAsDeleted(selector func(index *model.SegmentIndex) bool) error {
mt.segmentIndexLock.Lock()
defer mt.segmentIndexLock.Unlock()
buildIDs := make([]UniqueID, 0)
segIdxes := make([]*model.SegmentIndex, 0)
for _, segID := range segIDs {
if segIndexes, ok := mt.segmentIndexes[segID]; ok {
for _, segIdx := range segIndexes {
if segIdx.IsDeleted {
continue
}
clonedSegIdx := model.CloneSegmentIndex(segIdx)
clonedSegIdx.IsDeleted = true
segIdxes = append(segIdxes, clonedSegIdx)
buildIDs = append(buildIDs, segIdx.BuildID)
}
for _, segIdx := range mt.buildID2SegmentIndex {
if segIdx.IsDeleted {
continue
}
if selector(segIdx) {
clonedSegIdx := model.CloneSegmentIndex(segIdx)
clonedSegIdx.IsDeleted = true
segIdxes = append(segIdxes, clonedSegIdx)
buildIDs = append(buildIDs, segIdx.BuildID)
}
}
if len(segIdxes) == 0 {
log.Debug("IndexCoord metaTable MarkSegmentsIndexAsDeleted success, already have deleted",
zap.Int64s("segIDs", segIDs))
log.Debug("IndexCoord metaTable MarkSegmentsIndexAsDeleted success, no segment index need to mark")
return nil
}
err := mt.alterSegmentIndexes(segIdxes)
if err != nil {
log.Error("IndexCoord metaTable MarkSegmentsIndexAsDeleted fail", zap.Int64s("segIDs", segIDs), zap.Error(err))
return err
}
log.Info("IndexCoord metaTable MarkSegmentsIndexAsDeleted success", zap.Int64s("segIDs", segIDs))
return nil
}
......
......@@ -362,7 +362,9 @@ func TestMetaTable_UpdateVersion(t *testing.T) {
err = mt.UpdateVersion(newBuildID, nodeID)
assert.Error(t, err)
err = mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err = mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
err = mt.UpdateVersion(newBuildID, nodeID)
......@@ -432,7 +434,9 @@ func TestMetaTable_BuildIndex(t *testing.T) {
err = mt.BuildIndex(buildID)
assert.Error(t, err)
err = mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err = mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
err = mt.BuildIndex(newBuildID)
......@@ -694,13 +698,19 @@ func TestMetaTable_MarkSegmentsIndexAsDeleted(t *testing.T) {
},
})
err := mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err := mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
err = mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err = mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
err = mt.MarkSegmentsIndexAsDeleted([]int64{segID + 1})
err = mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID+1
})
assert.NoError(t, err)
})
......@@ -713,7 +723,9 @@ func TestMetaTable_MarkSegmentsIndexAsDeleted(t *testing.T) {
},
})
err := mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err := mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.Error(t, err)
})
}
......@@ -825,6 +837,10 @@ func TestMetaTable_IsIndexDeleted(t *testing.T) {
err := mt.MarkIndexAsDeleted(collID, []int64{indexID})
assert.NoError(t, err)
err = mt.MarkIndexAsDeleted(collID, []int64{indexID})
assert.NoError(t, err)
deleted = mt.IsIndexDeleted(collID, indexID)
assert.True(t, deleted)
......@@ -847,7 +863,9 @@ func TestMetaTable_IsSegIndexDeleted(t *testing.T) {
deleted := mt.IsSegIndexDeleted(buildID)
assert.False(t, deleted)
err := mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err := mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
deleted = mt.IsSegIndexDeleted(buildID)
assert.True(t, deleted)
......@@ -882,7 +900,9 @@ func TestMetaTable_GetMetasByNodeID(t *testing.T) {
segIdxes := mt.GetMetasByNodeID(nodeID)
assert.Equal(t, 1, len(segIdxes))
err = mt.MarkSegmentsIndexAsDeleted([]int64{segID + 1})
err = mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID+1
})
assert.NoError(t, err)
segIdxes = mt.GetMetasByNodeID(nodeID)
......@@ -900,7 +920,9 @@ func TestMetaTable_GetAllSegIndexes(t *testing.T) {
segIdxes := mt.GetAllSegIndexes()
assert.Equal(t, 1, len(segIdxes))
err := mt.MarkSegmentsIndexAsDeleted([]int64{segID})
err := mt.MarkSegmentsIndexAsDeleted(func(segIndex *model.SegmentIndex) bool {
return segIndex.SegmentID == segID
})
assert.NoError(t, err)
segIdxes = mt.GetAllSegIndexes()
......
......@@ -150,8 +150,8 @@ message GetIndexInfoResponse {
message DropIndexRequest {
int64 collectionID = 1;
repeated int64 partitionIDs = 2;
string index_name = 3;
int64 fieldID = 2;
}
message DescribeIndexRequest {
......
......@@ -1530,6 +1530,7 @@ func (dit *dropIndexTask) Execute(ctx context.Context) error {
var err error
dit.result, err = dit.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{
CollectionID: dit.collectionID,
PartitionIDs: nil,
IndexName: dit.IndexName,
})
if dit.result == nil {
......
......@@ -35,7 +35,7 @@ type Broker interface {
Flush(ctx context.Context, cID int64, segIDs []int64) error
Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
DropCollectionIndex(ctx context.Context, collID UniqueID) error
DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
GetSegmentIndexState(ctx context.Context, collID UniqueID, indexName string, segIDs []UniqueID) ([]*indexpb.SegmentIndexState, error)
}
......@@ -181,9 +181,10 @@ func (b *ServerBroker) Import(ctx context.Context, req *datapb.ImportTaskRequest
return b.s.dataCoord.Import(ctx, req)
}
func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID) error {
func (b *ServerBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
rsp, err := b.s.indexCoord.DropIndex(ctx, &indexpb.DropIndexRequest{
CollectionID: collID,
PartitionIDs: partIDs,
IndexName: "",
})
if err != nil {
......
......@@ -216,7 +216,7 @@ func TestServerBroker_DropCollectionIndex(t *testing.T) {
c := newTestCore(withInvalidIndexCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.DropCollectionIndex(ctx, 1)
err := b.DropCollectionIndex(ctx, 1, nil)
assert.Error(t, err)
})
......@@ -224,7 +224,7 @@ func TestServerBroker_DropCollectionIndex(t *testing.T) {
c := newTestCore(withFailedIndexCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.DropCollectionIndex(ctx, 1)
err := b.DropCollectionIndex(ctx, 1, nil)
assert.Error(t, err)
})
......@@ -232,7 +232,7 @@ func TestServerBroker_DropCollectionIndex(t *testing.T) {
c := newTestCore(withValidIndexCoord())
b := newServerBroker(c)
ctx := context.Background()
err := b.DropCollectionIndex(ctx, 1)
err := b.DropCollectionIndex(ctx, 1, nil)
assert.NoError(t, err)
})
}
......
......@@ -74,6 +74,7 @@ func (t *dropCollectionTask) Execute(ctx context.Context) error {
redoTask.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: t.core},
collID: collMeta.CollectionID,
partIDs: nil,
})
redoTask.AddAsyncStep(&deleteCollectionDataStep{
baseStep: baseStep{core: t.core},
......
......@@ -165,7 +165,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
}
dropIndexCalled := false
dropIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropIndexCalled = true
dropIndexChan <- struct{}{}
return nil
......
......@@ -68,6 +68,12 @@ func (t *dropPartitionTask) Execute(ctx context.Context) error {
ts: t.GetTs(),
})
redoTask.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: t.core},
collID: t.collMeta.CollectionID,
partIDs: []UniqueID{partID},
})
// TODO: release partition when query coord is ready.
redoTask.AddAsyncStep(&deletePartitionDataStep{
baseStep: baseStep{core: t.core},
......
......@@ -150,7 +150,7 @@ func Test_dropPartitionTask_Execute(t *testing.T) {
return 0, nil
}
core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc))
core := newTestCore(withValidProxyManager(), withMeta(meta), withGarbageCollector(gc), withDropIndex())
task := &dropPartitionTask{
baseTaskV2: baseTaskV2{core: core},
......
......@@ -38,6 +38,7 @@ func (c *bgGarbageCollector) ReDropCollection(collMeta *model.Collection, ts Tim
redo.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: c.s},
collID: collMeta.CollectionID,
partIDs: nil,
})
redo.AddAsyncStep(&deleteCollectionDataStep{
baseStep: baseStep{core: c.s},
......@@ -93,6 +94,11 @@ func (c *bgGarbageCollector) ReDropPartition(pChannels []string, partition *mode
baseStep: baseStep{core: c.s},
pChannels: pChannels,
})
redo.AddAsyncStep(&dropIndexStep{
baseStep: baseStep{core: c.s},
collID: partition.CollectionID,
partIDs: []UniqueID{partition.PartitionID},
})
redo.AddAsyncStep(&removePartitionMetaStep{
baseStep: baseStep{core: c.s},
collectionID: partition.CollectionID,
......
......@@ -31,7 +31,7 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
releaseCollectionChan <- struct{}{}
return nil
}
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
return errors.New("error mock DropCollectionIndex")
}
ticker := newTickerWithMockNormalStream()
......@@ -54,7 +54,7 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
......@@ -88,7 +88,7 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
......@@ -127,7 +127,7 @@ func TestGarbageCollectorCtx_ReDropCollection(t *testing.T) {
}
dropCollectionIndexCalled := false
dropCollectionIndexChan := make(chan struct{}, 1)
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID) error {
broker.DropCollectionIndexFunc = func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
dropCollectionIndexCalled = true
dropCollectionIndexChan <- struct{}{}
return nil
......@@ -230,7 +230,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core := newTestCore(withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
......@@ -249,7 +249,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
......@@ -272,7 +272,7 @@ func TestGarbageCollectorCtx_ReDropPartition(t *testing.T) {
tsoAllocator.GenerateTSOF = func(count uint32) (uint64, error) {
return 100, nil
}
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator))
core := newTestCore(withMeta(meta), withTtSynchronizer(ticker), withTsoAllocator(tsoAllocator), withDropIndex())
core.ddlTsLockManager = newDdlTsLockManagerV2(core.tsoAllocator)
gc := newBgGarbageCollector(core)
core.garbageCollector = gc
......
......@@ -296,6 +296,16 @@ func withInvalidProxyManager() Opt {
}
}
func withDropIndex() Opt {
return func(c *Core) {
c.broker = &mockBroker{
DropCollectionIndexFunc: func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
return nil
},
}
}
}
func withMeta(meta IMetaTable) Opt {
return func(c *Core) {
c.meta = meta
......@@ -743,7 +753,7 @@ type mockBroker struct {
FlushFunc func(ctx context.Context, cID int64, segIDs []int64) error
ImportFunc func(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error)
DropCollectionIndexFunc func(ctx context.Context, collID UniqueID) error
DropCollectionIndexFunc func(ctx context.Context, collID UniqueID, partIDs []UniqueID) error
}
func newMockBroker() *mockBroker {
......@@ -762,8 +772,8 @@ func (b mockBroker) ReleaseCollection(ctx context.Context, collectionID UniqueID
return b.ReleaseCollectionFunc(ctx, collectionID)
}
func (b mockBroker) DropCollectionIndex(ctx context.Context, collID UniqueID) error {
return b.DropCollectionIndexFunc(ctx, collID)
func (b mockBroker) DropCollectionIndex(ctx context.Context, collID UniqueID, partIDs []UniqueID) error {
return b.DropCollectionIndexFunc(ctx, collID, partIDs)
}
func withBroker(b Broker) Opt {
......
......@@ -252,11 +252,12 @@ func (s *releaseCollectionStep) Weight() stepPriority {
type dropIndexStep struct {
baseStep
collID UniqueID
collID UniqueID
partIDs []UniqueID
}
func (s *dropIndexStep) Execute(ctx context.Context) ([]nestedStep, error) {
err := s.core.broker.DropCollectionIndex(ctx, s.collID)
err := s.core.broker.DropCollectionIndex(ctx, s.collID, s.partIDs)
return nil, err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册