未验证 提交 2bd0a952 编写于 作者: T Ten Thousand Leaves 提交者: GitHub

Implement task expiring logic and make all deadlines configurable. (#16355)

/kind feature

issue: #15604
Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 be8d9a8b
......@@ -69,6 +69,22 @@ rootCoord:
maxPartitionNum: 4096 # Maximum number of partitions in a collection
minSegmentSizeToEnableIndex: 1024 # It's a threshold. When the segment size is less than this value, the segment will not be indexed
# (in seconds) Duration after which an import task will expire (be killed). Default 3600 seconds (1 hour).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importTaskExpiration: 3600
# (in seconds) Milvus will keep the record of import tasks for at least `importTaskRetention` seconds. Default 86400
# seconds (24 hours).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importTaskRetention: 86400
# (in seconds) During index building phase of an import task, Milvus will check the building status of a task's
# segments' indices every `importIndexCheckInterval` seconds. Default 300 seconds (5 minutes).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexCheckInterval: 300
# (in seconds) Maximum time to wait before pushing flushed segments online (make them searchable) during importing.
# Default 1200 seconds (20 minutes).
# Note: If default value is to be changed, change also the default in: internal/util/paramtable/component_param.go
importIndexWaitLimit: 1200
# Related configuration of proxy, used to validate client requests and reduce the returned results.
proxy:
port: 19530
......
......@@ -35,10 +35,11 @@ import (
)
const (
Bucket = "bucket"
FailedReason = "failed_reason"
MaxPendingCount = 32
delimiter = "/"
Bucket = "bucket"
FailedReason = "failed_reason"
MaxPendingCount = 32
delimiter = "/"
taskExpiredMsgPrefix = "task has expired after "
)
// import task state
......@@ -52,10 +53,9 @@ type importTaskState struct {
// importManager manager for import tasks
type importManager struct {
ctx context.Context // reserved
cancel context.CancelFunc // reserved
taskStore kv.MetaKv // Persistent task info storage.
busyNodes map[int64]bool // Set of all current working DataNodes.
ctx context.Context // reserved
taskStore kv.MetaKv // Persistent task info storage.
busyNodes map[int64]bool // Set of all current working DataNodes.
// TODO: Make pendingTask a map to improve look up performance.
pendingTasks []*datapb.ImportTaskInfo // pending tasks
......@@ -66,15 +66,16 @@ type importManager struct {
nextTaskID int64 // for generating next import task ID
lastReqID int64 // for generating a unique ID for import request
startOnce sync.Once
callImportService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse
}
// newImportManager helper function to create a importManager
func newImportManager(ctx context.Context, client kv.MetaKv, importService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse) *importManager {
ctx, cancel := context.WithCancel(ctx)
func newImportManager(ctx context.Context, client kv.MetaKv,
importService func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse) *importManager {
mgr := &importManager{
ctx: ctx,
cancel: cancel,
taskStore: client,
pendingTasks: make([]*datapb.ImportTaskInfo, 0, MaxPendingCount), // currently task queue max size is 32
workingTasks: make(map[int64]*datapb.ImportTaskInfo),
......@@ -86,16 +87,16 @@ func newImportManager(ctx context.Context, client kv.MetaKv, importService func(
lastReqID: 0,
callImportService: importService,
}
return mgr
}
func (m *importManager) init(ctx context.Context) error {
// Read tasks from etcd and save them as pendingTasks or workingTasks.
m.load()
m.sendOutTasks(ctx)
return nil
func (m *importManager) init(ctx context.Context) {
m.startOnce.Do(func() {
// Read tasks from Etcd and save them as pending tasks or working tasks.
m.loadFromTaskStore()
// Send out tasks to dataCoord.
m.sendOutTasks(ctx)
})
}
// sendOutTasks pushes all pending tasks to DataCoord, gets DataCoord response and re-add these tasks as working tasks.
......@@ -108,6 +109,10 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
// Trigger Import() action to DataCoord.
for len(m.pendingTasks) > 0 {
task := m.pendingTasks[0]
// Skip failed (mostly like expired) tasks.
if task.GetState().GetStateCode() == commonpb.ImportState_ImportFailed {
continue
}
it := &datapb.ImportTask{
CollectionId: task.GetCollectionId(),
PartitionId: task.GetPartitionId(),
......@@ -155,7 +160,7 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
log.Debug("import task added as working task", zap.Int64("task ID", it.TaskId))
task.State.StateCode = commonpb.ImportState_ImportPending
m.workingTasks[task.GetId()] = task
m.updateImportTask(task)
m.updateImportTaskStore(task)
}()
}
......@@ -259,7 +264,7 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
m.nextTaskID++
log.Info("new task created as pending task", zap.Int64("task ID", newTask.GetId()))
m.pendingTasks = append(m.pendingTasks, newTask)
m.saveImportTask(newTask)
m.storeImportTask(newTask)
}
log.Info("row-based import request processed", zap.Int64("reqID", reqID), zap.Any("taskIDs", taskList))
} else {
......@@ -280,7 +285,7 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
m.nextTaskID++
log.Info("new task created as pending task", zap.Int64("task ID", newTask.GetId()))
m.pendingTasks = append(m.pendingTasks, newTask)
m.saveImportTask(newTask)
m.storeImportTask(newTask)
log.Info("column-based import request processed", zap.Int64("reqID", reqID), zap.Int64("taskID", newTask.GetId()))
}
}()
......@@ -288,8 +293,8 @@ func (m *importManager) importJob(ctx context.Context, req *milvuspb.ImportReque
return resp
}
// updateTaskState updates the task's state in task store given ImportResult result, and returns the ImportTaskInfo of
// the given task.
// updateTaskState updates the task's state in in-memory working tasks list and in task store, given ImportResult
// result. It returns the ImportTaskInfo of the given task.
func (m *importManager) updateTaskState(ir *rootcoordpb.ImportResult) (*datapb.ImportTaskInfo, error) {
if ir == nil {
return nil, errors.New("import result is nil")
......@@ -298,26 +303,29 @@ func (m *importManager) updateTaskState(ir *rootcoordpb.ImportResult) (*datapb.I
found := false
var v *datapb.ImportTaskInfo
func() {
m.workingLock.Lock()
defer m.workingLock.Unlock()
ok := false
if v, ok = m.workingTasks[ir.TaskId]; ok {
found = true
v.State.StateCode = ir.GetState()
v.State.Segments = ir.GetSegments()
v.State.RowCount = ir.GetRowCount()
for _, kv := range ir.GetInfos() {
if kv.GetKey() == FailedReason {
v.State.ErrorMessage = kv.GetValue()
break
}
m.workingLock.Lock()
defer m.workingLock.Unlock()
ok := false
if v, ok = m.workingTasks[ir.TaskId]; ok {
// If the task has already been marked failed. Prevent further state updating and return an error.
if v.GetState().GetStateCode() == commonpb.ImportState_ImportFailed {
log.Warn("trying to update an already failed task which will end up being a no-op")
return nil, errors.New("trying to update an already failed task " + strconv.FormatInt(ir.GetTaskId(), 10))
}
found = true
v.State.StateCode = ir.GetState()
v.State.Segments = ir.GetSegments()
v.State.RowCount = ir.GetRowCount()
for _, kv := range ir.GetInfos() {
if kv.GetKey() == FailedReason {
v.State.ErrorMessage = kv.GetValue()
break
}
// Update task in task store.
m.updateImportTask(v)
}
m.updateImportTask(v)
}()
// Update task in task store.
m.updateImportTaskStore(v)
}
m.updateImportTaskStore(v)
if !found {
log.Debug("import manager update task import result failed", zap.Int64("taskID", ir.GetTaskId()))
......@@ -379,8 +387,8 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse
return resp
}
// load Loads task info from task store when RootCoord (re)starts.
func (m *importManager) load() error {
// loadFromTaskStore loads task info from task store when RootCoord (re)starts.
func (m *importManager) loadFromTaskStore() error {
log.Info("import manager starts loading from Etcd")
_, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath)
if err != nil {
......@@ -410,22 +418,21 @@ func (m *importManager) load() error {
return nil
}
// saveImportTask signs a lease and saves import task info into Etcd with this lease.
func (m *importManager) saveImportTask(task *datapb.ImportTaskInfo) error {
log.Debug("saving import task to Etcd", zap.Int64("Task ID", task.GetId()))
// TODO: Change default lease time and read it into config, once we figure out a proper value.
// Sign a lease.
leaseID, err := m.taskStore.Grant(10800) /*3 hours*/
// storeImportTask signs a lease and saves import task info into Etcd with this lease.
func (m *importManager) storeImportTask(task *datapb.ImportTaskInfo) error {
log.Debug("saving import task to Etcd", zap.Int64("task ID", task.GetId()))
// Sign a lease. Tasks will be stored for at least `ImportTaskRetention` seconds.
leaseID, err := m.taskStore.Grant(int64(Params.RootCoordCfg.ImportTaskRetention))
if err != nil {
log.Error("failed to grant lease from Etcd for data import",
zap.Int64("Task ID", task.GetId()),
zap.Int64("task ID", task.GetId()),
zap.Error(err))
return err
}
log.Debug("lease granted for task", zap.Int64("Task ID", task.GetId()))
log.Debug("lease granted for task", zap.Int64("task ID", task.GetId()))
var taskInfo []byte
if taskInfo, err = proto.Marshal(task); err != nil {
log.Error("failed to marshall task proto", zap.Int64("Task ID", task.GetId()), zap.Error(err))
log.Error("failed to marshall task proto", zap.Int64("task ID", task.GetId()), zap.Error(err))
return err
} else if err = m.taskStore.SaveWithLease(BuildImportTaskKey(task.GetId()), string(taskInfo), leaseID); err != nil {
log.Error("failed to save import task info into Etcd",
......@@ -433,12 +440,12 @@ func (m *importManager) saveImportTask(task *datapb.ImportTaskInfo) error {
zap.Error(err))
return err
}
log.Debug("task info successfully saved", zap.Int64("Task ID", task.GetId()))
log.Debug("task info successfully saved", zap.Int64("task ID", task.GetId()))
return nil
}
// updateImportTask updates the task info in Etcd according to task ID. It won't change the lease on the key.
func (m *importManager) updateImportTask(ti *datapb.ImportTaskInfo) error {
// updateImportTaskStore updates the task info in Etcd according to task ID. It won't change the lease on the key.
func (m *importManager) updateImportTaskStore(ti *datapb.ImportTaskInfo) error {
log.Debug("updating import task info in Etcd", zap.Int64("Task ID", ti.GetId()))
if taskInfo, err := proto.Marshal(ti); err != nil {
log.Error("failed to marshall task info proto", zap.Int64("Task ID", ti.GetId()), zap.Error(err))
......@@ -457,7 +464,63 @@ func (m *importManager) bringSegmentsOnline(ti *datapb.ImportTaskInfo) {
// TODO: Implement it.
}
// expireOldTasksLoop starts a loop that checks and expires old tasks every `ImportTaskExpiration` seconds.
func (m *importManager) expireOldTasksLoop(wg *sync.WaitGroup) {
defer wg.Done()
ticker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-m.ctx.Done():
log.Info("(in loop) import manager context done, exit expireOldTasksLoop")
return
case <-ticker.C:
log.Info("(in loop) starting expiring old tasks...",
zap.Any("cleaning up interval", time.Duration(Params.RootCoordCfg.ImportTaskExpiration)))
m.expireOldTasks()
}
}
}
// expireOldTasks marks expires tasks as failed.
func (m *importManager) expireOldTasks() {
// Expire old pending tasks, if any.
func() {
m.pendingLock.Lock()
defer m.pendingLock.Unlock()
for _, t := range m.pendingTasks {
if taskExpired(t) {
log.Info("a pending task has expired", zap.Any("task info", t))
t.State.StateCode = commonpb.ImportState_ImportFailed
t.State.ErrorMessage = taskExpiredMsgPrefix +
(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond).String()
m.updateImportTaskStore(t)
}
}
}()
// Expire old working tasks.
func() {
m.workingLock.Lock()
defer m.workingLock.Unlock()
for _, v := range m.workingTasks {
// Mark this expired task as failed.
if taskExpired(v) {
log.Info("a working task has expired", zap.Any("task info", v))
v.State.StateCode = commonpb.ImportState_ImportFailed
v.State.ErrorMessage = taskExpiredMsgPrefix +
(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond).String()
m.updateImportTaskStore(v)
}
}
}()
}
// BuildImportTaskKey constructs and returns an Etcd key with given task ID.
func BuildImportTaskKey(taskID int64) string {
return fmt.Sprintf("%s%s%d", Params.RootCoordCfg.ImportTaskSubPath, delimiter, taskID)
}
// taskExpired returns true if the task has already expired.
func taskExpired(ti *datapb.ImportTaskInfo) bool {
return Params.RootCoordCfg.ImportTaskExpiration <= float64(time.Now().Unix()-ti.GetCreateTs())
}
......@@ -18,7 +18,9 @@ package rootcoord
import (
"context"
"sync"
"testing"
"time"
"github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/kv"
......@@ -35,6 +37,7 @@ type customKV struct {
func TestImportManager_NewImportManager(t *testing.T) {
Params.RootCoordCfg.ImportTaskSubPath = "test_import_task"
Params.RootCoordCfg.ImportTaskExpiration = 1
mockKv := &kv.MockMetaKV{}
mockKv.InMemKv = make(map[string]string)
ti1 := &datapb.ImportTaskInfo{
......@@ -63,9 +66,60 @@ func TestImportManager_NewImportManager(t *testing.T) {
},
}
}
mgr := newImportManager(context.TODO(), mockKv, fn)
assert.NotNil(t, mgr)
mgr.init(context.TODO())
time.Sleep(1 * time.Second)
var wg sync.WaitGroup
wg.Add(1)
t.Run("working task expired", func(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, fn)
assert.NotNil(t, mgr)
mgr.init(ctx)
var wgLoop sync.WaitGroup
wgLoop.Add(1)
mgr.expireOldTasksLoop(&wgLoop)
wgLoop.Wait()
})
wg.Add(1)
t.Run("context done", func(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
mgr := newImportManager(ctx, mockKv, fn)
assert.NotNil(t, mgr)
mgr.init(context.TODO())
var wgLoop sync.WaitGroup
wgLoop.Add(1)
mgr.expireOldTasksLoop(&wgLoop)
wgLoop.Wait()
})
wg.Add(1)
t.Run("pending task expired", func(t *testing.T) {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
mgr := newImportManager(ctx, mockKv, fn)
assert.NotNil(t, mgr)
mgr.pendingTasks = append(mgr.pendingTasks, &datapb.ImportTaskInfo{
Id: 300,
State: &datapb.ImportTaskState{
StateCode: commonpb.ImportState_ImportPending,
},
CreateTs: time.Now().Unix() - 10,
})
mgr.loadFromTaskStore()
var wgLoop sync.WaitGroup
wgLoop.Add(1)
mgr.expireOldTasksLoop(&wgLoop)
wgLoop.Wait()
})
wg.Wait()
}
func TestImportManager_ImportJob(t *testing.T) {
......
......@@ -84,11 +84,7 @@ func metricProxy(v int64) string {
return fmt.Sprintf("client_%d", v)
}
var (
Params paramtable.ComponentParam
CheckCompleteIndexInterval = 3 * time.Minute
TaskTimeLimit = 3 * time.Hour
)
var Params paramtable.ComponentParam
// Core root coordinator core
type Core struct {
......@@ -1132,7 +1128,6 @@ func (c *Core) Init() error {
c.impTaskKv,
c.CallImportService,
)
c.importManager.init(c.ctx)
})
if initError != nil {
log.Debug("RootCoord init error", zap.Error(initError))
......@@ -1261,7 +1256,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
return c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
}
// Start start rootcoord
// Start starts RootCoord.
func (c *Core) Start() error {
if err := c.checkInit(); err != nil {
log.Debug("RootCoord Start checkInit failed", zap.Error(err))
......@@ -1281,11 +1276,12 @@ func (c *Core) Start() error {
log.Fatal("RootCoord Start reSendDdMsg failed", zap.Error(err))
panic(err)
}
c.wg.Add(4)
c.wg.Add(5)
go c.startTimeTickLoop()
go c.tsLoop()
go c.chanTimeTick.startWatch(&c.wg)
go c.checkFlushedSegmentsLoop()
go c.importManager.expireOldTasksLoop(&c.wg)
Params.RootCoordCfg.CreatedTime = time.Now()
Params.RootCoordCfg.UpdatedTime = time.Now()
})
......@@ -1293,7 +1289,7 @@ func (c *Core) Start() error {
return nil
}
// Stop stop rootcoord
// Stop stops rootCoord.
func (c *Core) Stop() error {
c.UpdateStateCode(internalpb.StateCode_Abnormal)
......@@ -2260,8 +2256,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
return resp, nil
}
// TODO: Implement this.
// Check import task state from datanode
// GetImportState returns the current state of an import task.
func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
if code, ok := c.checkHealthy(); !ok {
return &milvuspb.GetImportStateResponse{
......@@ -2269,23 +2264,15 @@ func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateR
}, nil
}
log.Info("receive get import state request")
resp := &milvuspb.GetImportStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}
return resp, nil
return c.importManager.getTaskState(req.Task), nil
}
// ReportImport reports import task state to RootCoord.
func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (*commonpb.Status, error) {
log.Info("receive import state report", zap.Any("import result", ir))
if code, ok := c.checkHealthy(); !ok {
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil
}
log.Info("receive import state report", zap.Any("import result", ir.String()))
// Upon receiving ReportImport request, update the related task's state in task store.
ti, err := c.importManager.updateTaskState(ir)
if err != nil {
......@@ -2294,6 +2281,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
Reason: err.Error(),
}, nil
}
// That's all for reporting, if task hasn't reached persisted or completed status yet.
if ti.GetState().GetStateCode() != commonpb.ImportState_ImportPersisted &&
ti.GetState().GetStateCode() != commonpb.ImportState_ImportCompleted {
log.Debug("transitional import state received, return immediately", zap.Any("import result", ir))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
// Reverse look up collection name on collection ID.
var colName string
for k, v := range c.MetaTable.collName2ID {
......@@ -2309,17 +2306,15 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
}, nil
}
// Start a loop to check segments' index states periodically.
c.wg.Add(1)
go c.checkCompleteIndexLoop(ctx, ti, colName, ir.Segments)
// When DataNode has done its thing, remove it from the busy node list.
c.importManager.busyNodesLock.Lock()
defer c.importManager.busyNodesLock.Unlock()
delete(c.importManager.busyNodes, ir.GetDatanodeId())
log.Info("dataNode is no longer busy",
zap.Int64("dataNode ID", ir.GetDatanodeId()),
zap.Int64("task ID", ir.GetTaskId()))
func() {
c.importManager.busyNodesLock.Lock()
defer c.importManager.busyNodesLock.Unlock()
delete(c.importManager.busyNodes, ir.GetDatanodeId())
log.Info("dataNode is no longer busy",
zap.Int64("dataNode ID", ir.GetDatanodeId()),
zap.Int64("task ID", ir.GetTaskId()))
}()
// Start a loop to check segments' index states periodically.
c.wg.Add(1)
......@@ -2394,7 +2389,9 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
log.Debug("proxy GetIndexState", zap.Int("# of IndexBuildIDs", len(getIndexStatesRequest.IndexBuildIDs)), zap.Error(err))
if len(getIndexStatesRequest.IndexBuildIDs) == 0 {
log.Info("empty index build IDs returned", zap.String("collection name", collectionName), zap.Int64("collection ID", collectionID))
log.Info("empty index build IDs returned",
zap.String("collection name", collectionName),
zap.Int64("collection ID", collectionID))
return 0, nil
}
states, err := c.CallGetIndexStatesService(ctx, getIndexStatesRequest.IndexBuildIDs)
......@@ -2421,20 +2418,18 @@ func (c *Core) CountCompleteIndex(ctx context.Context, collectionName string, co
// checkCompleteIndexLoop checks index build states for an import task's segments and bring these segments online when
// the criteria are met. checkCompleteIndexLoop does the check every CheckCompleteIndexInterval and exits if:
// (1) a certain percent of indices are built, (2) when context is done or (3) when the task is expired.
// (1) a certain percent of indices are built, (2) when context is done or (3) when `ImportIndexWaitLimit` has passed.
func (c *Core) checkCompleteIndexLoop(ctx context.Context, ti *datapb.ImportTaskInfo, colName string, segIDs []UniqueID) {
defer c.wg.Done()
ticker := time.NewTicker(CheckCompleteIndexInterval)
spent := time.Duration(time.Unix(time.Now().Unix()-ti.GetCreateTs(), 0).Nanosecond())
log.Info("reporting task time left",
zap.Int64("task ID", ti.GetId()),
zap.Int64("minutes remaining", int64((TaskTimeLimit-spent).Minutes())))
// TODO: Replace with real task time limit.
expireTicker := time.NewTicker(TaskTimeLimit - spent)
ticker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportIndexCheckInterval*1000) * time.Millisecond)
defer ticker.Stop()
expireTicker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportIndexWaitLimit*1000) * time.Millisecond)
defer expireTicker.Stop()
for {
select {
case <-c.ctx.Done():
log.Info("(in loop)context done, exiting checkCompleteIndexLoop", zap.Int64("task ID", ti.GetId()))
log.Info("(in loop)context done, exiting checkCompleteIndexLoop",
zap.Int64("task ID", ti.GetId()))
return
case <-ticker.C:
log.Info("(in loop)check segments' index states", zap.Int64("task ID", ti.GetId()))
......@@ -2448,10 +2443,11 @@ func (c *Core) checkCompleteIndexLoop(ctx context.Context, ti *datapb.ImportTask
return
}
case <-expireTicker.C:
log.Info("(in loop)task has expired, stop waiting for segment results", zap.Int64("task ID", ti.GetId()))
log.Info("(in loop)waited for sufficiently long time, bring segments online",
zap.Int64("task ID", ti.GetId()))
c.importManager.bringSegmentsOnline(ti)
return
}
}
}
......
......@@ -607,7 +607,6 @@ func TestRootCoordInit(t *testing.T) {
core.session.TriggerKill = false
err = core.Register()
assert.Nil(t, err)
}
func TestRootCoord_Base(t *testing.T) {
......@@ -623,12 +622,11 @@ func TestRootCoord_Base(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
CheckCompleteIndexInterval = 100 * time.Millisecond
TaskTimeLimit = 200 * time.Millisecond
coreFactory := msgstream.NewPmsFactory()
Params.Init()
Params.RootCoordCfg.DmlChannelNum = TestDMLChannelNum
Params.RootCoordCfg.ImportIndexCheckInterval = 0.1
Params.RootCoordCfg.ImportIndexWaitLimit = 0.2
core, err := NewCore(ctx, coreFactory)
assert.Nil(t, err)
randVal := rand.Int()
......@@ -1339,6 +1337,7 @@ func TestRootCoord_Base(t *testing.T) {
TaskId: 1,
RowCount: 100,
Segments: []int64{1003, 1004, 1005},
State: commonpb.ImportState_ImportCompleted,
}
for _, segmentID := range []int64{1003, 1004, 1005} {
......@@ -1389,13 +1388,28 @@ func TestRootCoord_Base(t *testing.T) {
TaskId: 3,
RowCount: 100,
Segments: []int64{1003, 1004, 1005},
State: commonpb.ImportState_ImportCompleted,
}
// Case where report import request is nil.
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), reqIR)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_CollectionNameNotFound, resp.ErrorCode)
})
wg.Add(1)
t.Run("report import with transitional state", func(t *testing.T) {
defer wg.Done()
req := &rootcoordpb.ImportResult{
TaskId: 0,
RowCount: 100,
Segments: []int64{1000, 1001, 1002},
State: commonpb.ImportState_ImportDownloaded,
}
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
time.Sleep(500 * time.Millisecond)
})
wg.Add(1)
t.Run("report import segments online ready", func(t *testing.T) {
defer wg.Done()
......@@ -1403,6 +1417,7 @@ func TestRootCoord_Base(t *testing.T) {
TaskId: 0,
RowCount: 100,
Segments: []int64{1000, 1001, 1002},
State: commonpb.ImportState_ImportCompleted,
}
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""), req)
assert.NoError(t, err)
......@@ -1410,6 +1425,29 @@ func TestRootCoord_Base(t *testing.T) {
time.Sleep(500 * time.Millisecond)
})
wg.Add(1)
t.Run("report import segments update already failed task", func(t *testing.T) {
defer wg.Done()
// Mark task 0 as failed.
core.importManager.updateTaskState(
&rootcoordpb.ImportResult{
TaskId: 0,
RowCount: 100,
State: commonpb.ImportState_ImportFailed,
Segments: []int64{1000, 1001, 1002},
})
// Now try to update this task with a complete status.
resp, err := core.ReportImport(context.WithValue(ctx, ctxKey{}, ""),
&rootcoordpb.ImportResult{
TaskId: 0,
RowCount: 100,
State: commonpb.ImportState_ImportCompleted,
Segments: []int64{1000, 1001, 1002},
})
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UpdateImportTaskFailure, resp.ErrorCode)
})
wg.Add(1)
t.Run("over ride index", func(t *testing.T) {
defer wg.Done()
......
......@@ -335,6 +335,10 @@ type rootCoordConfig struct {
DmlChannelNum int64
MaxPartitionNum int64
MinSegmentSizeToEnableIndex int64
ImportTaskExpiration float64
ImportTaskRetention float64
ImportIndexCheckInterval float64
ImportIndexWaitLimit float64
// --- ETCD Path ---
ImportTaskSubPath string
......@@ -348,6 +352,10 @@ func (p *rootCoordConfig) init(base *BaseTable) {
p.DmlChannelNum = p.Base.ParseInt64WithDefault("rootCoord.dmlChannelNum", 256)
p.MaxPartitionNum = p.Base.ParseInt64WithDefault("rootCoord.maxPartitionNum", 4096)
p.MinSegmentSizeToEnableIndex = p.Base.ParseInt64WithDefault("rootCoord.minSegmentSizeToEnableIndex", 1024)
p.ImportTaskExpiration = p.Base.ParseFloatWithDefault("rootCoord.importTaskExpiration", 3600)
p.ImportTaskRetention = p.Base.ParseFloatWithDefault("rootCoord.importTaskRetention", 3600*24)
p.ImportIndexCheckInterval = p.Base.ParseFloatWithDefault("rootCoord.importIndexCheckInterval", 60*5)
p.ImportIndexWaitLimit = p.Base.ParseFloatWithDefault("rootCoord.importIndexWaitLimit", 60*20)
p.ImportTaskSubPath = "importtask"
}
......
......@@ -100,9 +100,16 @@ func TestComponentParam(t *testing.T) {
assert.NotEqual(t, Params.MaxPartitionNum, 0)
t.Logf("master MaxPartitionNum = %d", Params.MaxPartitionNum)
assert.NotEqual(t, Params.MinSegmentSizeToEnableIndex, 0)
t.Logf("master MinSegmentSizeToEnableIndex = %d", Params.MinSegmentSizeToEnableIndex)
assert.NotEqual(t, Params.ImportTaskExpiration, 0)
t.Logf("master ImportTaskExpiration = %f", Params.ImportTaskExpiration)
assert.NotEqual(t, Params.ImportTaskRetention, 0)
t.Logf("master ImportTaskRetention = %f", Params.ImportTaskRetention)
assert.NotEqual(t, Params.ImportIndexCheckInterval, 0)
t.Logf("master ImportIndexCheckInterval = %f", Params.ImportIndexCheckInterval)
assert.NotEqual(t, Params.ImportIndexWaitLimit, 0)
t.Logf("master ImportIndexWaitLimit = %f", Params.ImportIndexWaitLimit)
Params.CreatedTime = time.Now()
Params.UpdatedTime = time.Now()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册