diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index 90d1356a776a5cf43a440f7315aa94b54f7101d6..fb693340459ff054ec2fcac5eb539b17e2f0acd4 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -26,6 +26,7 @@ import ( oplog "github.com/opentracing/opentracing-go/log" ) +// TaskQueue is a queue used to store tasks. type TaskQueue interface { utChan() <-chan int utEmpty() bool @@ -39,6 +40,7 @@ type TaskQueue interface { //tryToRemoveUselessIndexBuildTask(indexID UniqueID) []UniqueID } +// BaseTaskQueue is a basic instance of TaskQueue. type BaseTaskQueue struct { unissuedTasks *list.List activeTasks map[UniqueID]task @@ -89,6 +91,7 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error { // return queue.unissuedTasks.Front().Value.(task) //} +// PopUnissuedTask pops a task from tasks queue. func (queue *BaseTaskQueue) PopUnissuedTask() task { queue.utLock.Lock() defer queue.utLock.Unlock() @@ -103,6 +106,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task { return ft.Value.(task) } +// AddActiveTask adds a task to activeTasks. func (queue *BaseTaskQueue) AddActiveTask(t task) { queue.atLock.Lock() defer queue.atLock.Unlock() @@ -116,6 +120,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) { queue.activeTasks[tID] = t } +// PopActiveTask tasks out a task from activateTask and the task will be executed. func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task { queue.atLock.Lock() defer queue.atLock.Unlock() @@ -150,6 +155,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task { // return indexBuildIDs //} +// Enqueue adds a task to TaskQueue. func (queue *BaseTaskQueue) Enqueue(t task) error { err := t.OnEnqueue() if err != nil { @@ -158,10 +164,12 @@ func (queue *BaseTaskQueue) Enqueue(t task) error { return queue.addUnissuedTask(t) } +// IndexBuildTaskQueue is a task queue used to store building index tasks. type IndexBuildTaskQueue struct { BaseTaskQueue } +// NewIndexBuildTaskQueue creates a new IndexBuildTaskQueue. func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexBuildTaskQueue { return &IndexBuildTaskQueue{ BaseTaskQueue: BaseTaskQueue{ @@ -174,6 +182,7 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexBuildTaskQueue { } } +// TaskScheduler is a scheduler of indexing tasks. type TaskScheduler struct { IndexBuildQueue TaskQueue @@ -184,6 +193,7 @@ type TaskScheduler struct { cancel context.CancelFunc } +// NewTaskScheduler creates a new task scheduler of indexing tasks. func NewTaskScheduler(ctx context.Context, kv kv.BaseKV) (*TaskScheduler, error) { ctx1, cancel := context.WithCancel(ctx) @@ -280,6 +290,7 @@ func (sched *TaskScheduler) indexBuildLoop() { } } +// Start stats the task scheduler of indexing tasks. func (sched *TaskScheduler) Start() error { sched.wg.Add(1) @@ -287,6 +298,7 @@ func (sched *TaskScheduler) Start() error { return nil } +// Close closes the task scheduler of indexing tasks. func (sched *TaskScheduler) Close() { sched.cancel() sched.wg.Wait()