未验证 提交 fc3309b8 编写于 作者: C cai.zhang 提交者: GitHub

All indexes segments both write handoff (#19230)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 42d371fd
...@@ -25,16 +25,13 @@ import ( ...@@ -25,16 +25,13 @@ import (
"sync" "sync"
"time" "time"
"github.com/golang/protobuf/proto"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/kv" "github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/logutil"
) )
...@@ -56,6 +53,7 @@ type flushedSegmentWatcher struct { ...@@ -56,6 +53,7 @@ type flushedSegmentWatcher struct {
meta *metaTable meta *metaTable
builder *indexBuilder builder *indexBuilder
ic *IndexCoord ic *IndexCoord
handoff *handoff
internalTasks map[UniqueID]*internalTask internalTasks map[UniqueID]*internalTask
} }
...@@ -65,7 +63,8 @@ type internalTask struct { ...@@ -65,7 +63,8 @@ type internalTask struct {
segmentInfo *datapb.SegmentInfo segmentInfo *datapb.SegmentInfo
} }
func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, builder *indexBuilder, ic *IndexCoord) (*flushedSegmentWatcher, error) { func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, builder *indexBuilder,
handoff *handoff, ic *IndexCoord) (*flushedSegmentWatcher, error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
fsw := &flushedSegmentWatcher{ fsw := &flushedSegmentWatcher{
ctx: ctx, ctx: ctx,
...@@ -77,6 +76,7 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable, ...@@ -77,6 +76,7 @@ func newFlushSegmentWatcher(ctx context.Context, kv kv.MetaKv, meta *metaTable,
internalNotify: make(chan struct{}, 1), internalNotify: make(chan struct{}, 1),
meta: meta, meta: meta,
builder: builder, builder: builder,
handoff: handoff,
ic: ic, ic: ic,
} }
err := fsw.reloadFromKV() err := fsw.reloadFromKV()
...@@ -128,9 +128,8 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) { ...@@ -128,9 +128,8 @@ func (fsw *flushedSegmentWatcher) enqueueInternalTask(segmentID UniqueID) {
state: indexTaskPrepare, state: indexTaskPrepare,
segmentInfo: nil, segmentInfo: nil,
} }
return
} }
logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher already have the task", zap.Int64("segmentID", segmentID)) logutil.Logger(fsw.ctx).Info("flushedSegmentWatcher enqueueInternalTask success", zap.Int64("segmentID", segmentID))
} }
func (fsw *flushedSegmentWatcher) internalScheduler() { func (fsw *flushedSegmentWatcher) internalScheduler() {
...@@ -179,6 +178,13 @@ func (fsw *flushedSegmentWatcher) internalNotifyFunc() { ...@@ -179,6 +178,13 @@ func (fsw *flushedSegmentWatcher) internalNotifyFunc() {
} }
} }
func (fsw *flushedSegmentWatcher) Len() int {
fsw.internalTaskMutex.RLock()
defer fsw.internalTaskMutex.RUnlock()
return len(fsw.internalTasks)
}
func (fsw *flushedSegmentWatcher) updateInternalTaskState(segID UniqueID, state indexTaskState) { func (fsw *flushedSegmentWatcher) updateInternalTaskState(segID UniqueID, state indexTaskState) {
fsw.internalTaskMutex.Lock() fsw.internalTaskMutex.Lock()
defer fsw.internalTaskMutex.Unlock() defer fsw.internalTaskMutex.Unlock()
...@@ -216,19 +222,6 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg ...@@ -216,19 +222,6 @@ func (fsw *flushedSegmentWatcher) setInternalTaskSegmentInfo(segID UniqueID, seg
log.Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID)) log.Debug("flushedSegmentWatcher set internal task segment info success", zap.Int64("segID", segID))
} }
func (fsw *flushedSegmentWatcher) allParentsDone(segIDs []UniqueID) bool {
fsw.internalTaskMutex.RLock()
defer fsw.internalTaskMutex.RUnlock()
done := true
for _, segID := range segIDs {
if _, ok := fsw.internalTasks[segID]; ok {
done = false
break
}
}
return done
}
func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
t := fsw.getInternalTask(segID) t := fsw.getInternalTask(segID)
log.Debug("IndexCoord flushedSegmentWatcher process internal task", zap.Int64("segID", segID), log.Debug("IndexCoord flushedSegmentWatcher process internal task", zap.Int64("segID", segID),
...@@ -249,52 +242,11 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) { ...@@ -249,52 +242,11 @@ func (fsw *flushedSegmentWatcher) internalProcess(segID UniqueID) {
fsw.updateInternalTaskState(segID, indexTaskInProgress) fsw.updateInternalTaskState(segID, indexTaskInProgress)
fsw.internalNotifyFunc() fsw.internalNotifyFunc()
case indexTaskInProgress: case indexTaskInProgress:
state := fsw.meta.GetSegmentIndexState(segID) if fsw.handoff.taskDone(segID) {
if state.state == commonpb.IndexState_Finished || state.state == commonpb.IndexState_Failed || state.state == commonpb.IndexState_IndexStateNone {
log.Debug("all tasks are finished", zap.Int64("segID", segID), zap.String("state", state.state.String()))
fsw.updateInternalTaskState(segID, indexTaskDone) fsw.updateInternalTaskState(segID, indexTaskDone)
fsw.internalNotifyFunc() fsw.internalNotifyFunc()
} }
case indexTaskDone: case indexTaskDone:
if !fsw.allParentsDone(t.segmentInfo.CompactionFrom) {
log.Debug("flushed segment create index done, but there are still parent task that haven't written handoff event",
zap.Int64("segID", segID), zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom))
return
}
indexInfos := fsw.meta.GetSegmentIndexes(segID)
enableIndex := len(indexInfos) > 0
handoffTask := &querypb.SegmentInfo{
SegmentID: segID,
CollectionID: t.segmentInfo.CollectionID,
PartitionID: t.segmentInfo.PartitionID,
NumRows: t.segmentInfo.NumOfRows,
DmChannel: t.segmentInfo.GetInsertChannel(),
IndexName: "",
IndexID: 0,
CompactionFrom: t.segmentInfo.CompactionFrom,
CreatedByCompaction: t.segmentInfo.CreatedByCompaction,
SegmentState: t.segmentInfo.State,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
EnableIndex: enableIndex,
}
for _, indexInfo := range indexInfos {
handoffTask.IndexInfos = append(handoffTask.IndexInfos, &querypb.FieldIndexInfo{
FieldID: fsw.meta.GetFieldIDByIndexID(t.segmentInfo.CollectionID, indexInfo.IndexID),
EnableIndex: true,
IndexName: fsw.meta.GetIndexNameByID(t.segmentInfo.CollectionID, indexInfo.IndexID),
IndexID: indexInfo.IndexID,
BuildID: indexInfo.BuildID,
IndexParams: fsw.meta.GetIndexParams(t.segmentInfo.CollectionID, indexInfo.IndexID),
//IndexFilePaths: nil,
//IndexSize: 0,
})
}
if err := fsw.writeHandoffSegment(handoffTask); err != nil {
log.Error("IndexCoord flushSegmentWatcher writeHandoffSegment with no index info fail",
zap.Int64("segID", segID), zap.Error(err))
return
}
if err := fsw.removeFlushedSegment(t); err != nil { if err := fsw.removeFlushedSegment(t); err != nil {
log.Error("IndexCoord flushSegmentWatcher removeFlushedSegment fail", log.Error("IndexCoord flushSegmentWatcher removeFlushedSegment fail",
zap.Int64("segID", segID), zap.Error(err)) zap.Int64("segID", segID), zap.Error(err))
...@@ -341,31 +293,12 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error { ...@@ -341,31 +293,12 @@ func (fsw *flushedSegmentWatcher) constructTask(t *internalTask) error {
fsw.builder.enqueue(buildID) fsw.builder.enqueue(buildID)
} }
} }
fsw.handoff.enqueue(t.segmentInfo.ID)
log.Debug("flushedSegmentWatcher construct children task success", zap.Int64("segID", t.segmentInfo.ID), log.Debug("flushedSegmentWatcher construct children task success", zap.Int64("segID", t.segmentInfo.ID),
zap.Int("tasks num", len(fieldIndexes))) zap.Int("tasks num", len(fieldIndexes)))
return nil return nil
} }
func (fsw *flushedSegmentWatcher) writeHandoffSegment(t *querypb.SegmentInfo) error {
key := fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, t.CollectionID, t.PartitionID, t.SegmentID)
value, err := proto.Marshal(t)
if err != nil {
log.Error("IndexCoord marshal handoff task fail", zap.Int64("collID", t.CollectionID),
zap.Int64("partID", t.PartitionID), zap.Int64("segID", t.SegmentID), zap.Error(err))
return err
}
err = fsw.kvClient.Save(key, string(value))
if err != nil {
log.Error("IndexCoord save handoff task fail", zap.Int64("collID", t.CollectionID),
zap.Int64("partID", t.PartitionID), zap.Int64("segID", t.SegmentID), zap.Error(err))
return err
}
log.Info("IndexCoord write handoff task success", zap.Int64("collID", t.CollectionID),
zap.Int64("partID", t.PartitionID), zap.Int64("segID", t.SegmentID))
return nil
}
func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error { func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error {
deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID) deletedKeys := fmt.Sprintf("%s/%d/%d/%d", util.FlushedSegmentPrefix, t.segmentInfo.CollectionID, t.segmentInfo.PartitionID, t.segmentInfo.ID)
err := fsw.kvClient.RemoveWithPrefix(deletedKeys) err := fsw.kvClient.RemoveWithPrefix(deletedKeys)
...@@ -379,42 +312,15 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error { ...@@ -379,42 +312,15 @@ func (fsw *flushedSegmentWatcher) removeFlushedSegment(t *internalTask) error {
return nil return nil
} }
func (fsw *flushedSegmentWatcher) pullSegmentInfo(segmentID UniqueID) error { func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error {
t := fsw.getInternalTask(segmentID) defer fsw.internalNotifyFunc()
log.Debug("prepare flushed segment task", zap.Int64("segID", segID))
t := fsw.getInternalTask(segID)
if t.segmentInfo != nil { if t.segmentInfo != nil {
return nil return nil
} }
resp, err := fsw.ic.dataCoordClient.GetSegmentInfo(fsw.ctx, &datapb.GetSegmentInfoRequest{ info, err := fsw.ic.pullSegmentInfo(fsw.ctx, segID)
SegmentIDs: []int64{segmentID},
IncludeUnHealthy: true,
})
if err != nil { if err != nil {
log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
return err
}
if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segmentID),
zap.String("fail reason", resp.Status.GetReason()))
if resp.Status.GetReason() == msgSegmentNotFound(segmentID) {
return errSegmentNotFound(segmentID)
}
return errors.New(resp.Status.GetReason())
}
for _, info := range resp.Infos {
if info.ID == segmentID {
fsw.setInternalTaskSegmentInfo(segmentID, info)
return nil
}
}
errMsg := fmt.Sprintf("flushedSegmentWatcher get segment info fail, the segment is not include in the response with ID: %d", segmentID)
log.Error(errMsg)
return errors.New(errMsg)
}
func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error {
defer fsw.internalNotifyFunc()
log.Debug("prepare flushed segment task", zap.Int64("segID", segID))
if err := fsw.pullSegmentInfo(segID); err != nil {
log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID), log.Error("flushedSegmentWatcher get segment info fail", zap.Int64("segID", segID),
zap.Error(err)) zap.Error(err))
if errors.Is(err, ErrSegmentNotFound) { if errors.Is(err, ErrSegmentNotFound) {
...@@ -423,28 +329,6 @@ func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error { ...@@ -423,28 +329,6 @@ func (fsw *flushedSegmentWatcher) prepare(segID UniqueID) error {
} }
return err return err
} }
//t := fsw.getInternalTask(segID) fsw.setInternalTaskSegmentInfo(segID, info)
//if t.segmentInfo.CreatedByCompaction {
// if err := fsw.removeCompactedTasks(t); err != nil {
// return err
// }
//}
return nil return nil
} }
//func (fsw *flushedSegmentWatcher) removeCompactedTasks(t *internalTask) error {
// log.Debug("IndexCoord flushedSegmentWatcher mark task as deleted which is compacted", zap.Int64("segID", t.segmentInfo.ID),
// zap.Int64s("compactionFrom", t.segmentInfo.CompactionFrom))
// if err := fsw.builder.markTasksAsDeleted(fsw.meta.GetBuildIDsFromSegIDs(t.segmentInfo.CompactionFrom)); err != nil {
// log.Error("mark index meta fail, try again", zap.Int64s("compacted segIDs", t.segmentInfo.CompactionFrom),
// zap.Error(err))
// return err
// }
// for _, segID := range t.segmentInfo.CompactionFrom {
// fsw.deleteChildrenTask(segID)
// if _, ok := fsw.internalTasks[segID]; ok {
// fsw.updateInternalTaskState(segID, indexTaskDeleted)
// }
// }
// return nil
//}
...@@ -21,8 +21,7 @@ import ( ...@@ -21,8 +21,7 @@ import (
"errors" "errors"
"sync" "sync"
"testing" "testing"
"time"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
...@@ -36,22 +35,43 @@ import ( ...@@ -36,22 +35,43 @@ import (
func Test_flushSegmentWatcher(t *testing.T) { func Test_flushSegmentWatcher(t *testing.T) {
ctx := context.Background() ctx := context.Background()
watcher, err := newFlushSegmentWatcher(ctx, fsw, err := newFlushSegmentWatcher(ctx,
&mockETCDKV{ &mockETCDKV{
loadWithRevision: func(key string) ([]string, []string, int64, error) { loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"seg1"}, []string{"12345"}, 1, nil return []string{"1", "2", "3"}, []string{"1", "2", "3"}, 1, nil
},
removeWithPrefix: func(key string) error {
return nil
}, },
}, },
&metaTable{ &metaTable{
catalog: &indexcoord.Catalog{ catalog: &indexcoord.Catalog{
Txn: NewMockEtcdKV(), Txn: NewMockEtcdKV(),
}, },
indexLock: sync.RWMutex{},
segmentIndexLock: sync.RWMutex{},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{},
}, },
&indexBuilder{}, &IndexCoord{ &indexBuilder{}, &handoff{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(), dataCoordClient: NewDataCoordMock(),
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.NotNil(t, watcher) assert.NotNil(t, fsw)
fsw.enqueueInternalTask(1)
fsw.Start()
// hold ticker.C
time.Sleep(time.Second * 2)
for fsw.Len() != 0 {
time.Sleep(time.Second)
}
fsw.Stop()
} }
func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
...@@ -61,7 +81,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { ...@@ -61,7 +81,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) { loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{"12345"}, 1, nil return []string{"segID1"}, []string{"12345"}, 1, nil
}, },
}, &metaTable{}, &indexBuilder{}, &IndexCoord{ }, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(), dataCoordClient: NewDataCoordMock(),
}) })
assert.NoError(t, err) assert.NoError(t, err)
...@@ -74,7 +94,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { ...@@ -74,7 +94,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) { loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{"12345"}, 1, errors.New("error") return []string{"segID1"}, []string{"12345"}, 1, errors.New("error")
}, },
}, &metaTable{}, &indexBuilder{}, &IndexCoord{ }, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(), dataCoordClient: NewDataCoordMock(),
}) })
assert.Error(t, err) assert.Error(t, err)
...@@ -87,7 +107,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { ...@@ -87,7 +107,7 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
loadWithRevision: func(key string) ([]string, []string, int64, error) { loadWithRevision: func(key string) ([]string, []string, int64, error) {
return []string{"segID1"}, []string{"segID"}, 1, nil return []string{"segID1"}, []string{"segID"}, 1, nil
}, },
}, &metaTable{}, &indexBuilder{}, &IndexCoord{ }, &metaTable{}, &indexBuilder{}, &handoff{}, &IndexCoord{
dataCoordClient: NewDataCoordMock(), dataCoordClient: NewDataCoordMock(),
}) })
assert.Error(t, err) assert.Error(t, err)
...@@ -95,6 +115,48 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) { ...@@ -95,6 +115,48 @@ func Test_flushSegmentWatcher_newFlushSegmentWatcher(t *testing.T) {
}) })
} }
func Test_flushedSegmentWatcher_internalRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fsw := &flushedSegmentWatcher{
ctx: ctx,
cancel: cancel,
kvClient: NewMockEtcdKV(),
wg: sync.WaitGroup{},
scheduleDuration: time.Second,
internalTaskMutex: sync.RWMutex{},
internalNotify: make(chan struct{}, 1),
etcdRevision: 0,
watchChan: nil,
meta: nil,
builder: nil,
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: {
state: indexTaskPrepare,
segmentInfo: &datapb.SegmentInfo{
CollectionID: collID,
PartitionID: partID,
ID: segID,
},
},
segID + 1: {
state: indexTaskPrepare,
segmentInfo: nil,
},
segID - 1: {
state: indexTaskPrepare,
segmentInfo: nil,
},
},
}
fsw.internalRun()
assert.Equal(t, 3, fsw.Len())
}
func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) { func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
meta := &metaTable{ meta := &metaTable{
segmentIndexLock: sync.RWMutex{}, segmentIndexLock: sync.RWMutex{},
...@@ -150,6 +212,15 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) { ...@@ -150,6 +212,15 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
} }
fsw := &flushedSegmentWatcher{ fsw := &flushedSegmentWatcher{
handoff: &handoff{
tasks: map[UniqueID]struct{}{},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: meta,
notifyChan: make(chan struct{}, 1),
scheduleDuration: time.Second,
kvClient: nil,
},
ic: &IndexCoord{ ic: &IndexCoord{
dataCoordClient: &DataCoordMock{ dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
...@@ -217,14 +288,7 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) { ...@@ -217,14 +288,7 @@ func Test_flushSegmentWatcher_internalProcess_success(t *testing.T) {
fsw.internalTaskMutex.RUnlock() fsw.internalTaskMutex.RUnlock()
}) })
err := fsw.meta.FinishTask(&indexpb.IndexTaskInfo{ fsw.handoff.deleteTask(segID)
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFiles: []string{"file1", "file2"},
SerializedSize: 100,
FailReason: "",
})
assert.NoError(t, err)
t.Run("inProgress", func(t *testing.T) { t.Run("inProgress", func(t *testing.T) {
fsw.internalProcess(segID) fsw.internalProcess(segID)
...@@ -304,9 +368,9 @@ func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) { ...@@ -304,9 +368,9 @@ func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) {
fsw.internalTaskMutex.RUnlock() fsw.internalTaskMutex.RUnlock()
}) })
t.Run("write handoff event fail", func(t *testing.T) { t.Run("remove flushed segment fail", func(t *testing.T) {
fsw.kvClient = &mockETCDKV{ fsw.kvClient = &mockETCDKV{
save: func(s string, s2 string) error { removeWithPrefix: func(key string) error {
return errors.New("error") return errors.New("error")
}, },
} }
...@@ -339,35 +403,87 @@ func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) { ...@@ -339,35 +403,87 @@ func Test_flushSegmentWatcher_internalProcess_error(t *testing.T) {
fsw.internalTaskMutex.RUnlock() fsw.internalTaskMutex.RUnlock()
}) })
t.Run("remove flushed segment fail", func(t *testing.T) { t.Run("invalid state", func(t *testing.T) {
fsw.kvClient = &mockETCDKV{ fsw.internalTasks = map[UniqueID]*internalTask{
save: func(s string, s2 string) error { segID: {
return nil state: indexTaskDeleted,
}, segmentInfo: nil,
removeWithPrefix: func(key string) error {
return errors.New("error")
}, },
} }
fsw.internalProcess(segID) fsw.internalProcess(segID)
fsw.internalTaskMutex.RLock()
assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state)
fsw.internalTaskMutex.RUnlock()
}) })
}
t.Run("index is not zero", func(t *testing.T) { func Test_flushSegmentWatcher_prepare_error(t *testing.T) {
fsw.internalProcess(segID) t.Run("segmentInfo already exist", func(t *testing.T) {
fsw.internalTaskMutex.RLock() ctx, cancel := context.WithCancel(context.Background())
assert.Equal(t, indexTaskDone, fsw.internalTasks[segID].state) fsw := &flushedSegmentWatcher{
fsw.internalTaskMutex.RUnlock() ctx: ctx,
cancel: cancel,
kvClient: NewMockEtcdKV(),
wg: sync.WaitGroup{},
scheduleDuration: time.Second,
internalTaskMutex: sync.RWMutex{},
internalNotify: make(chan struct{}, 1),
etcdRevision: 0,
watchChan: nil,
meta: nil,
builder: nil,
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: {
state: indexTaskPrepare,
segmentInfo: &datapb.SegmentInfo{
CollectionID: collID,
PartitionID: partID,
ID: segID,
},
},
},
}
err := fsw.prepare(segID)
assert.NoError(t, err)
}) })
t.Run("invalid state", func(t *testing.T) { t.Run("segment is not exist", func(t *testing.T) {
fsw.internalTasks = map[UniqueID]*internalTask{ ctx, cancel := context.WithCancel(context.Background())
fsw := &flushedSegmentWatcher{
ctx: ctx,
cancel: cancel,
kvClient: NewMockEtcdKV(),
wg: sync.WaitGroup{},
scheduleDuration: time.Second,
internalTaskMutex: sync.RWMutex{},
internalNotify: make(chan struct{}, 1),
etcdRevision: 0,
watchChan: nil,
meta: nil,
builder: nil,
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: nil,
}, nil
},
},
},
handoff: nil,
internalTasks: map[UniqueID]*internalTask{
segID: { segID: {
state: indexTaskDeleted, state: indexTaskPrepare,
segmentInfo: nil, segmentInfo: nil,
}, },
},
} }
fsw.internalProcess(segID)
err := fsw.prepare(segID)
assert.ErrorIs(t, err, ErrSegmentNotFound)
}) })
} }
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"context"
"errors"
"sort"
"sync"
"time"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
type handoff struct {
ctx context.Context
cancel context.CancelFunc
tasks map[UniqueID]struct{}
taskMutex sync.RWMutex
wg sync.WaitGroup
meta *metaTable
notifyChan chan struct{}
scheduleDuration time.Duration
kvClient kv.MetaKv
ic *IndexCoord
}
func newHandoff(ctx context.Context, metaTable *metaTable, kvClient kv.MetaKv, ic *IndexCoord) *handoff {
ctx, cancel := context.WithCancel(ctx)
hd := &handoff{
ctx: ctx,
cancel: cancel,
tasks: make(map[UniqueID]struct{}),
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: metaTable,
notifyChan: make(chan struct{}, 1),
scheduleDuration: time.Second,
kvClient: kvClient,
ic: ic,
}
hd.recoveryFromMeta()
log.Ctx(ctx).Info("new handoff success")
return hd
}
func (hd *handoff) recoveryFromMeta() {
allSegIndexes := hd.meta.GetAllSegIndexes()
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
hd.tasks = make(map[UniqueID]struct{}, 0)
for segID, segIdx := range allSegIndexes {
if segIdx.IsDeleted {
continue
}
if segIdx.WriteHandoff {
continue
}
hd.tasks[segID] = struct{}{}
}
log.Ctx(hd.ctx).Info("recovery from meta success", zap.Int("task num", len(hd.tasks)))
}
func (hd *handoff) enqueue(segID UniqueID) {
defer hd.Notify()
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
// note: don't reset state if the task contains state
hd.tasks[segID] = struct{}{}
log.Ctx(hd.ctx).Info("segment need to write handoff", zap.Int64("segID", segID))
}
func (hd *handoff) Start() {
hd.wg.Add(1)
go hd.scheduler()
}
func (hd *handoff) Stop() {
hd.cancel()
hd.wg.Wait()
}
func (hd *handoff) Notify() {
select {
case hd.notifyChan <- struct{}{}:
default:
}
}
func (hd *handoff) scheduler() {
log.Ctx(hd.ctx).Info("IndexCoord handoff start...")
defer hd.wg.Done()
ticker := time.NewTicker(hd.scheduleDuration)
defer ticker.Stop()
for {
select {
case <-hd.ctx.Done():
log.Info("IndexCoord handoff context done, exit...")
return
case <-ticker.C:
hd.run()
case <-hd.notifyChan:
hd.run()
}
}
}
func (hd *handoff) run() {
hd.taskMutex.RLock()
segIDs := make([]UniqueID, 0, len(hd.tasks))
for segID := range hd.tasks {
segIDs = append(segIDs, segID)
}
hd.taskMutex.RUnlock()
sort.Slice(segIDs, func(i, j int) bool {
return segIDs[i] < segIDs[j]
})
if len(segIDs) > 0 {
log.Ctx(hd.ctx).Debug("handoff process...", zap.Int("task num", len(segIDs)))
}
for i, segID := range segIDs {
hd.process(segID, i == 0)
}
}
func (hd *handoff) process(segID UniqueID, front bool) {
state := hd.meta.GetSegmentIndexState(segID)
log.Ctx(hd.ctx).RatedDebug(30, "handoff task is process", zap.Int64("segID", segID),
zap.Bool("front", front), zap.String("state", state.state.String()))
if state.state == commonpb.IndexState_Failed {
log.Ctx(hd.ctx).Error("build index failed, may be need manual intervention", zap.Int64("segID", segID),
zap.String("fail reason", state.failReason))
hd.deleteTask(segID)
// TODO @xiaocai2333: need write handoff event?
return
}
if state.state == commonpb.IndexState_Finished {
log.Ctx(hd.ctx).Debug("build index for segment success, write handoff event...", zap.Int64("segID", segID))
info, err := hd.ic.pullSegmentInfo(hd.ctx, segID)
if err != nil {
if errors.Is(err, ErrSegmentNotFound) {
log.Ctx(hd.ctx).Error("handoff get segment fail", zap.Error(err))
hd.deleteTask(segID)
return
}
log.Ctx(hd.ctx).Warn("handoff get segment fail, need to retry", zap.Error(err))
return
}
if info.IsImporting {
log.Debug("segment is importing, can't write handoff event", zap.Int64("segID", segID))
return
}
if front || hd.allParentsDone(info.CompactionFrom) {
log.Ctx(hd.ctx).Debug("segment can write handoff event", zap.Int64("segID", segID), zap.Bool("front", front),
zap.Int64s("compactionFrom", info.CompactionFrom))
indexInfos := hd.meta.GetSegmentIndexes(segID)
if len(indexInfos) == 0 {
log.Ctx(hd.ctx).Warn("ready to write handoff, but there is no index, may be dropped", zap.Int64("segID", segID))
hd.deleteTask(segID)
return
}
handoffTask := &querypb.SegmentInfo{
SegmentID: segID,
CollectionID: info.CollectionID,
PartitionID: info.PartitionID,
NumRows: info.NumOfRows,
DmChannel: info.GetInsertChannel(),
CompactionFrom: info.CompactionFrom,
CreatedByCompaction: info.CreatedByCompaction,
SegmentState: info.State,
IndexInfos: make([]*querypb.FieldIndexInfo, 0),
EnableIndex: true,
}
for _, indexInfo := range indexInfos {
handoffTask.IndexInfos = append(handoffTask.IndexInfos, &querypb.FieldIndexInfo{
FieldID: hd.meta.GetFieldIDByIndexID(info.CollectionID, indexInfo.IndexID),
EnableIndex: true,
IndexName: hd.meta.GetIndexNameByID(info.CollectionID, indexInfo.IndexID),
IndexID: indexInfo.IndexID,
BuildID: indexInfo.BuildID,
IndexParams: hd.meta.GetIndexParams(info.CollectionID, indexInfo.IndexID),
//IndexFilePaths: nil,
//IndexSize: 0,
})
}
if err := hd.writeHandoffSegment(handoffTask); err != nil {
log.Ctx(hd.ctx).Warn("write handoff task fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
log.Ctx(hd.ctx).Info("write handoff task success", zap.Int64("segID", segID))
if err := hd.meta.MarkSegmentWriteHandoff(segID); err != nil {
log.Ctx(hd.ctx).Warn("mark segment as write handoff fail, need to retry", zap.Int64("segID", segID), zap.Error(err))
return
}
log.Ctx(hd.ctx).Info("mark segment as write handoff success, remove task", zap.Int64("segID", segID))
hd.deleteTask(segID)
return
}
}
}
func (hd *handoff) Len() int {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
return len(hd.tasks)
}
func (hd *handoff) deleteTask(segID UniqueID) {
hd.taskMutex.Lock()
defer hd.taskMutex.Unlock()
delete(hd.tasks, segID)
}
func (hd *handoff) taskDone(segID UniqueID) bool {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
_, ok := hd.tasks[segID]
return !ok
}
func (hd *handoff) allParentsDone(segIDs []UniqueID) bool {
hd.taskMutex.RLock()
defer hd.taskMutex.RUnlock()
for _, segID := range segIDs {
if _, ok := hd.tasks[segID]; ok {
return false
}
}
return true
}
func (hd *handoff) writeHandoffSegment(info *querypb.SegmentInfo) error {
key := buildHandoffKey(info.CollectionID, info.PartitionID, info.SegmentID)
value, err := proto.Marshal(info)
if err != nil {
log.Error("IndexCoord marshal handoff task fail", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID), zap.Error(err))
return err
}
err = hd.kvClient.Save(key, string(value))
if err != nil {
log.Error("IndexCoord save handoff task fail", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID), zap.Error(err))
return err
}
log.Info("IndexCoord write handoff task success", zap.Int64("collID", info.CollectionID),
zap.Int64("partID", info.PartitionID), zap.Int64("segID", info.SegmentID))
return nil
}
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexcoord
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/indexcoord"
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
)
func createMetaForHandoff(catalog metastore.IndexCoordCatalog) *metaTable {
return &metaTable{
catalog: catalog,
segmentIndexLock: sync.RWMutex{},
indexLock: sync.RWMutex{},
collectionIndexes: map[UniqueID]map[UniqueID]*model.Index{
collID: {
indexID: {
TenantID: "",
CollectionID: collID,
FieldID: fieldID,
IndexID: indexID,
IndexName: indexName,
IsDeleted: false,
CreateTime: 0,
TypeParams: nil,
IndexParams: nil,
},
},
},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: false,
},
},
segID + 1: {
indexID: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 1,
IndexState: 1,
IsDeleted: true,
WriteHandoff: false,
},
},
segID + 2: {
indexID: {
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 2,
IndexState: 1,
IsDeleted: false,
WriteHandoff: true,
},
},
},
buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{
buildID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID,
IndexID: indexID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: false,
},
buildID + 1: {
SegmentID: segID + 1,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 1,
IndexID: indexID,
IndexState: 1,
IsDeleted: true,
WriteHandoff: false,
},
buildID + 2: {
SegmentID: segID + 2,
CollectionID: collID,
PartitionID: partID,
BuildID: buildID + 2,
IndexID: indexID,
IndexState: 1,
IsDeleted: false,
WriteHandoff: true,
},
},
}
}
func Test_newHandoff(t *testing.T) {
ctx := context.Background()
hd := newHandoff(ctx, createMetaForHandoff(&indexcoord.Catalog{Txn: NewMockEtcdKV()}), NewMockEtcdKV(), &IndexCoord{dataCoordClient: NewDataCoordMock()})
assert.NotNil(t, hd)
assert.Equal(t, 1, len(hd.tasks))
hd.enqueue(segID)
assert.Equal(t, 1, len(hd.tasks))
err := hd.meta.AddIndex(&model.SegmentIndex{
SegmentID: segID + 3,
CollectionID: collID,
PartitionID: partID,
NumRows: 0,
IndexID: indexID,
BuildID: buildID + 3,
})
assert.NoError(t, err)
hd.enqueue(segID + 3)
assert.Equal(t, 2, len(hd.tasks))
hd.Start()
err = hd.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID,
State: commonpb.IndexState_Finished,
IndexFiles: []string{"file1", "file2"},
SerializedSize: 100,
FailReason: "",
})
assert.NoError(t, err)
err = hd.meta.FinishTask(&indexpb.IndexTaskInfo{
BuildID: buildID + 3,
State: commonpb.IndexState_Failed,
IndexFiles: nil,
SerializedSize: 0,
FailReason: "failed",
})
assert.NoError(t, err)
// handle ticker
time.Sleep(time.Second * 2)
for hd.Len() != 0 {
time.Sleep(500 * time.Millisecond)
}
assert.True(t, hd.taskDone(segID))
assert.True(t, hd.taskDone(segID+3))
hd.Stop()
}
func Test_handoff_error(t *testing.T) {
t.Run("pullSegmentInfo fail", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errors.New("error")
},
},
},
}
hd.process(segID, true)
assert.Equal(t, 1, hd.Len())
hd.ic.dataCoordClient = &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errSegmentNotFound(segID)
},
}
hd.process(segID, true)
assert.Equal(t, 0, hd.Len())
})
t.Run("is importing", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Infos: []*datapb.SegmentInfo{
{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1024,
State: commonpb.SegmentState_Flushed,
IsImporting: true,
},
},
}, nil
},
},
},
}
hd.process(segID, true)
assert.Equal(t, 1, hd.Len())
})
t.Run("get index info fail", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: true,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: nil,
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID, true)
assert.Equal(t, 0, hd.Len())
})
t.Run("write handoff fail", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
catalog: &indexcoord.Catalog{Txn: NewMockEtcdKV()},
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: &mockETCDKV{
save: func(s string, s2 string) error {
return errors.New("error")
},
},
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID, true)
assert.Equal(t, 1, hd.Len())
})
t.Run("mark meta as write handoff fail", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
wg: sync.WaitGroup{},
meta: &metaTable{
catalog: &indexcoord.Catalog{Txn: &mockETCDKV{
multiSave: func(m map[string]string) error {
return errors.New("error")
},
}},
segmentIndexLock: sync.RWMutex{},
segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{
segID: {
indexID: {
SegmentID: segID,
CollectionID: collID,
PartitionID: partID,
IndexID: indexID,
BuildID: buildID,
IndexState: commonpb.IndexState_Finished,
FailReason: "",
IsDeleted: false,
WriteHandoff: false,
},
},
},
},
notifyChan: make(chan struct{}, 1),
scheduleDuration: 0,
kvClient: NewMockEtcdKV(),
ic: &IndexCoord{
dataCoordClient: NewDataCoordMock(),
},
}
hd.process(segID, true)
assert.Equal(t, 1, hd.Len())
})
}
func Test_handoff_allParentsDone(t *testing.T) {
t.Run("done", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
},
taskMutex: sync.RWMutex{},
}
done := hd.allParentsDone([]UniqueID{segID + 1, segID + 2, segID + 3})
assert.True(t, done)
})
t.Run("not done", func(t *testing.T) {
hd := &handoff{
tasks: map[UniqueID]struct{}{
segID: {},
segID + 1: {},
},
taskMutex: sync.RWMutex{},
}
done := hd.allParentsDone([]UniqueID{segID + 1, segID + 2, segID + 3})
assert.False(t, done)
})
}
...@@ -87,6 +87,7 @@ type IndexCoord struct { ...@@ -87,6 +87,7 @@ type IndexCoord struct {
indexBuilder *indexBuilder indexBuilder *indexBuilder
garbageCollector *garbageCollector garbageCollector *garbageCollector
flushedSegmentWatcher *flushedSegmentWatcher flushedSegmentWatcher *flushedSegmentWatcher
handoff *handoff
metricsCacheManager *metricsinfo.MetricsCacheManager metricsCacheManager *metricsinfo.MetricsCacheManager
...@@ -218,7 +219,8 @@ func (i *IndexCoord) Init() error { ...@@ -218,7 +219,8 @@ func (i *IndexCoord) Init() error {
i.chunkManager = chunkManager i.chunkManager = chunkManager
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i) i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i) i.handoff = newHandoff(i.loopCtx, i.metaTable, i.etcdKV, i)
i.flushedSegmentWatcher, err = newFlushSegmentWatcher(i.loopCtx, i.etcdKV, i.metaTable, i.indexBuilder, i.handoff, i)
if err != nil { if err != nil {
initErr = err initErr = err
return return
...@@ -254,6 +256,7 @@ func (i *IndexCoord) Start() error { ...@@ -254,6 +256,7 @@ func (i *IndexCoord) Start() error {
i.indexBuilder.Start() i.indexBuilder.Start()
i.garbageCollector.Start() i.garbageCollector.Start()
i.handoff.Start()
i.flushedSegmentWatcher.Start() i.flushedSegmentWatcher.Start()
i.UpdateStateCode(internalpb.StateCode_Healthy) i.UpdateStateCode(internalpb.StateCode_Healthy)
...@@ -1051,3 +1054,30 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { ...@@ -1051,3 +1054,30 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
} }
} }
} }
func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*datapb.SegmentInfo, error) {
resp, err := i.dataCoordClient.GetSegmentInfo(ctx, &datapb.GetSegmentInfoRequest{
SegmentIDs: []int64{segmentID},
IncludeUnHealthy: false,
})
if err != nil {
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID), zap.Error(err))
return nil, err
}
if resp.Status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Error("IndexCoord get segment info fail", zap.Int64("segID", segmentID),
zap.String("fail reason", resp.Status.GetReason()))
if resp.Status.GetReason() == msgSegmentNotFound(segmentID) {
return nil, errSegmentNotFound(segmentID)
}
return nil, errors.New(resp.Status.GetReason())
}
for _, info := range resp.Infos {
if info.ID == segmentID {
return info, nil
}
}
errMsg := msgSegmentNotFound(segmentID)
log.Error(errMsg)
return nil, errSegmentNotFound(segmentID)
}
...@@ -18,6 +18,7 @@ package indexcoord ...@@ -18,6 +18,7 @@ package indexcoord
import ( import (
"context" "context"
"errors"
"math/rand" "math/rand"
"path" "path"
"strconv" "strconv"
...@@ -683,3 +684,82 @@ func TestIndexCoord_UnHealthy(t *testing.T) { ...@@ -683,3 +684,82 @@ func TestIndexCoord_UnHealthy(t *testing.T) {
// assert.Nil(t, err) // assert.Nil(t, err)
// assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode()) // assert.Equal(t, commonpb.ErrorCode_Success, status.GetErrorCode())
//} //}
func TestIndexCoord_pullSegmentInfo(t *testing.T) {
t.Run("success", func(t *testing.T) {
ic := &IndexCoord{
dataCoordClient: NewDataCoordMock(),
}
info, err := ic.pullSegmentInfo(context.Background(), segID)
assert.NoError(t, err)
assert.NotNil(t, info)
})
t.Run("fail", func(t *testing.T) {
ic := &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return nil, errors.New("error")
},
},
}
info, err := ic.pullSegmentInfo(context.Background(), segID)
assert.Error(t, err)
assert.Nil(t, info)
})
t.Run("not success", func(t *testing.T) {
ic := &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
}, nil
},
},
}
info, err := ic.pullSegmentInfo(context.Background(), segID)
assert.Error(t, err)
assert.Nil(t, info)
})
t.Run("failed to get segment", func(t *testing.T) {
ic := &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgSegmentNotFound(segID),
},
}, nil
},
},
}
info, err := ic.pullSegmentInfo(context.Background(), segID)
assert.Error(t, err)
assert.Nil(t, info)
})
t.Run("seg not exist", func(t *testing.T) {
ic := &IndexCoord{
dataCoordClient: &DataCoordMock{
CallGetSegmentInfo: func(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) {
return &datapb.GetSegmentInfoResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Infos: []*datapb.SegmentInfo{},
}, nil
},
},
}
info, err := ic.pullSegmentInfo(context.Background(), segID)
assert.ErrorIs(t, err, ErrSegmentNotFound)
assert.Nil(t, info)
})
}
...@@ -1032,3 +1032,18 @@ func (mt *metaTable) MarkSegmentsIndexAsDeletedByBuildID(buildIDs []UniqueID) er ...@@ -1032,3 +1032,18 @@ func (mt *metaTable) MarkSegmentsIndexAsDeletedByBuildID(buildIDs []UniqueID) er
log.Info("IndexCoord metaTable MarkSegmentsIndexAsDeletedByBuildID success", zap.Int64s("buildIDs", buildIDs)) log.Info("IndexCoord metaTable MarkSegmentsIndexAsDeletedByBuildID success", zap.Int64s("buildIDs", buildIDs))
return nil return nil
} }
func (mt *metaTable) MarkSegmentWriteHandoff(segID UniqueID) error {
mt.segmentIndexLock.Lock()
defer mt.segmentIndexLock.Unlock()
segIdxes := make([]*model.SegmentIndex, 0)
if segIndexes, ok := mt.segmentIndexes[segID]; ok {
for _, segIdx := range segIndexes {
clonedSegIdx := model.CloneSegmentIndex(segIdx)
clonedSegIdx.WriteHandoff = true
segIdxes = append(segIdxes, clonedSegIdx)
}
}
return mt.alterSegmentIndexes(segIdxes)
}
...@@ -178,6 +178,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { ...@@ -178,6 +178,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
} }
buildIDs := make([]UniqueID, 0) buildIDs := make([]UniqueID, 0)
segIDs := make([]UniqueID, 0)
for _, segmentInfo := range segmentsInfo.Infos { for _, segmentInfo := range segmentsInfo.Infos {
if segmentInfo.State != commonpb.SegmentState_Flushed { if segmentInfo.State != commonpb.SegmentState_Flushed {
continue continue
...@@ -201,6 +202,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { ...@@ -201,6 +202,7 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
if have || buildID == 0 { if have || buildID == 0 {
continue continue
} }
segIDs = append(segIDs, segmentInfo.ID)
buildIDs = append(buildIDs, buildID) buildIDs = append(buildIDs, buildID)
} }
...@@ -213,6 +215,9 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error { ...@@ -213,6 +215,9 @@ func (cit *CreateIndexTask) Execute(ctx context.Context) error {
for _, buildID := range buildIDs { for _, buildID := range buildIDs {
cit.indexCoordClient.indexBuilder.enqueue(buildID) cit.indexCoordClient.indexBuilder.enqueue(buildID)
} }
for _, segID := range segIDs {
cit.indexCoordClient.handoff.enqueue(segID)
}
return nil return nil
} }
......
...@@ -18,12 +18,14 @@ package indexcoord ...@@ -18,12 +18,14 @@ package indexcoord
import ( import (
"errors" "errors"
"fmt"
"strconv" "strconv"
"strings" "strings"
"github.com/milvus-io/milvus/api/schemapb" "github.com/milvus-io/milvus/api/schemapb"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/util"
) )
// getDimension gets the dimension of data from building index request. // getDimension gets the dimension of data from building index request.
...@@ -66,3 +68,7 @@ func parseBuildIDFromFilePath(key string) (UniqueID, error) { ...@@ -66,3 +68,7 @@ func parseBuildIDFromFilePath(key string) (UniqueID, error) {
} }
return strconv.ParseInt(ss[len(ss)-1], 10, 64) return strconv.ParseInt(ss[len(ss)-1], 10, 64)
} }
func buildHandoffKey(collID, partID, segID UniqueID) string {
return fmt.Sprintf("%s/%d/%d/%d", util.HandoffSegmentPrefix, collID, partID, segID)
}
...@@ -20,6 +20,7 @@ type SegmentIndex struct { ...@@ -20,6 +20,7 @@ type SegmentIndex struct {
CreateTime uint64 CreateTime uint64
IndexFilePaths []string IndexFilePaths []string
IndexSize uint64 IndexSize uint64
WriteHandoff bool
} }
func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex { func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
...@@ -42,6 +43,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex { ...@@ -42,6 +43,7 @@ func UnmarshalSegmentIndexModel(segIndex *indexpb.SegmentIndex) *SegmentIndex {
CreateTime: segIndex.CreateTime, CreateTime: segIndex.CreateTime,
IndexFilePaths: segIndex.IndexFilesPaths, IndexFilePaths: segIndex.IndexFilesPaths,
IndexSize: segIndex.SerializeSize, IndexSize: segIndex.SerializeSize,
WriteHandoff: segIndex.WriteHandoff,
} }
} }
...@@ -65,6 +67,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex { ...@@ -65,6 +67,7 @@ func MarshalSegmentIndexModel(segIdx *SegmentIndex) *indexpb.SegmentIndex {
Deleted: segIdx.IsDeleted, Deleted: segIdx.IsDeleted,
CreateTime: segIdx.CreateTime, CreateTime: segIdx.CreateTime,
SerializeSize: segIdx.IndexSize, SerializeSize: segIdx.IndexSize,
WriteHandoff: segIdx.WriteHandoff,
} }
} }
...@@ -84,5 +87,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex { ...@@ -84,5 +87,6 @@ func CloneSegmentIndex(segIndex *SegmentIndex) *SegmentIndex {
CreateTime: segIndex.CreateTime, CreateTime: segIndex.CreateTime,
IndexFilePaths: segIndex.IndexFilePaths, IndexFilePaths: segIndex.IndexFilePaths,
IndexSize: segIndex.IndexSize, IndexSize: segIndex.IndexSize,
WriteHandoff: segIndex.WriteHandoff,
} }
} }
...@@ -260,6 +260,10 @@ message SegmentInfo { ...@@ -260,6 +260,10 @@ message SegmentInfo {
bool createdByCompaction = 14; bool createdByCompaction = 14;
repeated int64 compactionFrom = 15; repeated int64 compactionFrom = 15;
uint64 dropped_at = 16; // timestamp when segment marked drop uint64 dropped_at = 16; // timestamp when segment marked drop
// A flag indicating if:
// (1) this segment is created by bulk load, and
// (2) the bulk load task that creates this segment has not yet reached `ImportCompleted` state.
bool is_importing = 17;
} }
message SegmentStartPosition { message SegmentStartPosition {
......
...@@ -67,6 +67,7 @@ message SegmentIndex { ...@@ -67,6 +67,7 @@ message SegmentIndex {
bool deleted = 12; bool deleted = 12;
uint64 create_time = 13; uint64 create_time = 13;
uint64 serialize_size = 14; uint64 serialize_size = 14;
bool write_handoff = 15;
} }
message RegisterNodeRequest { message RegisterNodeRequest {
......
...@@ -241,6 +241,7 @@ func (ob *HandoffObserver) handoff(segment *querypb.SegmentInfo) { ...@@ -241,6 +241,7 @@ func (ob *HandoffObserver) handoff(segment *querypb.SegmentInfo) {
ID: segment.SegmentID, ID: segment.SegmentID,
CollectionID: segment.CollectionID, CollectionID: segment.CollectionID,
PartitionID: segment.PartitionID, PartitionID: segment.PartitionID,
NumOfRows: segment.NumRows,
InsertChannel: segment.GetDmChannel(), InsertChannel: segment.GetDmChannel(),
State: segment.GetSegmentState(), State: segment.GetSegmentState(),
CreatedByCompaction: segment.GetCreatedByCompaction(), CreatedByCompaction: segment.GetCreatedByCompaction(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册