From d6f95f49e8c12458b018d49031de62f59421989b Mon Sep 17 00:00:00 2001 From: "zhenshan.cao" Date: Mon, 18 Oct 2021 21:34:47 +0800 Subject: [PATCH] Fix golint error in querycoord (#10127) Signed-off-by: zhenshan.cao --- internal/querycoord/impl.go | 16 +- internal/querycoord/impl_test.go | 4 +- internal/querycoord/query_coord.go | 8 +- internal/querycoord/task.go | 375 ++++++++++----------- internal/querycoord/task_scheduler.go | 36 +- internal/querycoord/task_scheduler_test.go | 54 +-- internal/querycoord/task_test.go | 50 +-- 7 files changed, 269 insertions(+), 274 deletions(-) diff --git a/internal/querycoord/impl.go b/internal/querycoord/impl.go index e1c900a3b..933e282a3 100644 --- a/internal/querycoord/impl.go +++ b/internal/querycoord/impl.go @@ -144,8 +144,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest) - loadCollectionTask := &LoadCollectionTask{ - BaseTask: baseTask, + loadCollectionTask := &loadCollectionTask{ + baseTask: baseTask, LoadCollectionRequest: req, rootCoord: qc.rootCoordClient, dataCoord: qc.dataCoordClient, @@ -193,8 +193,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest) - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: req, cluster: qc.cluster, meta: qc.meta, @@ -336,8 +336,8 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest) - loadPartitionTask := &LoadPartitionTask{ - BaseTask: baseTask, + loadPartitionTask := &loadPartitionTask{ + baseTask: baseTask, LoadPartitionsRequest: req, dataCoord: qc.dataCoordClient, cluster: qc.cluster, @@ -407,8 +407,8 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas req.PartitionIDs = toReleasedPartitions baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest) - releasePartitionTask := &ReleasePartitionTask{ - BaseTask: baseTask, + releasePartitionTask := &releasePartitionTask{ + baseTask: baseTask, ReleasePartitionsRequest: req, cluster: qc.cluster, } diff --git a/internal/querycoord/impl_test.go b/internal/querycoord/impl_test.go index 9ed5405a8..aac4d69d0 100644 --- a/internal/querycoord/impl_test.go +++ b/internal/querycoord/impl_test.go @@ -452,8 +452,8 @@ func TestLoadBalanceTask(t *testing.T) { BalanceReason: querypb.TriggerCondition_nodeDown, } - loadBalanceTask := &LoadBalanceTask{ - BaseTask: &BaseTask{ + loadBalanceTask := &loadBalanceTask{ + baseTask: &baseTask{ ctx: baseCtx, Condition: NewTaskCondition(baseCtx), triggerCondition: querypb.TriggerCondition_nodeDown, diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index e8733bd30..7abe5d6c4 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -238,8 +238,8 @@ func (qc *QueryCoord) watchNodeLoop() { } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown) - loadBalanceTask := &LoadBalanceTask{ - BaseTask: baseTask, + loadBalanceTask := &loadBalanceTask{ + baseTask: baseTask, LoadBalanceRequest: loadBalanceSegment, rootCoord: qc.rootCoordClient, dataCoord: qc.dataCoordClient, @@ -289,8 +289,8 @@ func (qc *QueryCoord) watchNodeLoop() { } baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown) - loadBalanceTask := &LoadBalanceTask{ - BaseTask: baseTask, + loadBalanceTask := &loadBalanceTask{ + baseTask: baseTask, LoadBalanceRequest: loadBalanceSegment, rootCoord: qc.rootCoordClient, dataCoord: qc.dataCoordClient, diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 88eb4fa87..117e9bd66 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -86,7 +86,7 @@ type task interface { updateTaskProcess() } -type BaseTask struct { +type baseTask struct { Condition ctx context.Context cancel context.CancelFunc @@ -104,11 +104,11 @@ type BaseTask struct { childTasksMu sync.RWMutex } -func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *BaseTask { +func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *baseTask { childCtx, cancel := context.WithCancel(ctx) condition := NewTaskCondition(childCtx) - baseTask := &BaseTask{ + baseTask := &baseTask{ ctx: childCtx, cancel: cancel, Condition: condition, @@ -122,52 +122,52 @@ func newBaseTask(ctx context.Context, triggerType querypb.TriggerCondition) *Bas } // getTaskID function returns the unique taskID of the trigger task -func (bt *BaseTask) getTaskID() UniqueID { +func (bt *baseTask) getTaskID() UniqueID { return bt.taskID } // setTaskID function sets the trigger task with a unique id, which is allocated by tso -func (bt *BaseTask) setTaskID(id UniqueID) { +func (bt *baseTask) setTaskID(id UniqueID) { bt.taskID = id } -func (bt *BaseTask) traceCtx() context.Context { +func (bt *baseTask) traceCtx() context.Context { return bt.ctx } -func (bt *BaseTask) getTriggerCondition() querypb.TriggerCondition { +func (bt *baseTask) getTriggerCondition() querypb.TriggerCondition { return bt.triggerCondition } -func (bt *BaseTask) taskPriority() querypb.TriggerCondition { +func (bt *baseTask) taskPriority() querypb.TriggerCondition { return bt.triggerCondition } -func (bt *BaseTask) setParentTask(t task) { +func (bt *baseTask) setParentTask(t task) { bt.parentTask = t } -func (bt *BaseTask) getParentTask() task { +func (bt *baseTask) getParentTask() task { return bt.parentTask } // GetChildTask function returns all the child tasks of the trigger task // Child task may be loadSegmentTask, watchDmChannelTask or watchQueryChannelTask -func (bt *BaseTask) getChildTask() []task { +func (bt *baseTask) getChildTask() []task { bt.childTasksMu.RLock() defer bt.childTasksMu.RUnlock() return bt.childTasks } -func (bt *BaseTask) addChildTask(t task) { +func (bt *baseTask) addChildTask(t task) { bt.childTasksMu.Lock() defer bt.childTasksMu.Unlock() bt.childTasks = append(bt.childTasks, t) } -func (bt *BaseTask) removeChildTaskByID(taskID UniqueID) { +func (bt *baseTask) removeChildTaskByID(taskID UniqueID) { bt.childTasksMu.Lock() defer bt.childTasksMu.Unlock() @@ -180,32 +180,32 @@ func (bt *BaseTask) removeChildTaskByID(taskID UniqueID) { bt.childTasks = result } -func (bt *BaseTask) isValid() bool { +func (bt *baseTask) isValid() bool { return true } -func (bt *BaseTask) reschedule(ctx context.Context) ([]task, error) { +func (bt *baseTask) reschedule(ctx context.Context) ([]task, error) { return nil, nil } // State returns the state of task, such as taskUndo, taskDoing, taskDone, taskExpired, taskFailed -func (bt *BaseTask) getState() taskState { +func (bt *baseTask) getState() taskState { bt.stateMu.RLock() defer bt.stateMu.RUnlock() return bt.state } -func (bt *BaseTask) setState(state taskState) { +func (bt *baseTask) setState(state taskState) { bt.stateMu.Lock() defer bt.stateMu.Unlock() bt.state = state } -func (bt *BaseTask) isRetryable() bool { +func (bt *baseTask) isRetryable() bool { return bt.retryCount > 0 } -func (bt *BaseTask) setResultInfo(err error) { +func (bt *baseTask) setResultInfo(err error) { bt.resultMu.Lock() defer bt.resultMu.Unlock() @@ -222,25 +222,23 @@ func (bt *BaseTask) setResultInfo(err error) { bt.result.Reason = bt.result.Reason + ", " + err.Error() } -func (bt *BaseTask) getResultInfo() *commonpb.Status { +func (bt *baseTask) getResultInfo() *commonpb.Status { bt.resultMu.RLock() defer bt.resultMu.RUnlock() return proto.Clone(bt.result).(*commonpb.Status) } -func (bt *BaseTask) updateTaskProcess() { +func (bt *baseTask) updateTaskProcess() { // TODO:: } -func (bt *BaseTask) rollBack(ctx context.Context) []task { +func (bt *baseTask) rollBack(ctx context.Context) []task { //TODO:: return nil } -//************************grpcTask***************************// -// LoadCollectionTask will load all the data of this collection to query nodes -type LoadCollectionTask struct { - *BaseTask +type loadCollectionTask struct { + *baseTask *querypb.LoadCollectionRequest rootCoord types.RootCoord dataCoord types.DataCoord @@ -248,23 +246,23 @@ type LoadCollectionTask struct { meta Meta } -func (lct *LoadCollectionTask) msgBase() *commonpb.MsgBase { +func (lct *loadCollectionTask) msgBase() *commonpb.MsgBase { return lct.Base } -func (lct *LoadCollectionTask) marshal() ([]byte, error) { +func (lct *loadCollectionTask) marshal() ([]byte, error) { return proto.Marshal(lct.LoadCollectionRequest) } -func (lct *LoadCollectionTask) msgType() commonpb.MsgType { +func (lct *loadCollectionTask) msgType() commonpb.MsgType { return lct.Base.MsgType } -func (lct *LoadCollectionTask) timestamp() Timestamp { +func (lct *loadCollectionTask) timestamp() Timestamp { return lct.Base.Timestamp } -func (lct *LoadCollectionTask) updateTaskProcess() { +func (lct *loadCollectionTask) updateTaskProcess() { collectionID := lct.CollectionID childTasks := lct.getChildTask() allDone := true @@ -282,18 +280,18 @@ func (lct *LoadCollectionTask) updateTaskProcess() { } } -func (lct *LoadCollectionTask) preExecute(ctx context.Context) error { +func (lct *loadCollectionTask) preExecute(ctx context.Context) error { collectionID := lct.CollectionID schema := lct.Schema lct.setResultInfo(nil) - log.Debug("start do LoadCollectionTask", + log.Debug("start do loadCollectionTask", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Stringer("schema", schema)) return nil } -func (lct *LoadCollectionTask) execute(ctx context.Context) error { +func (lct *loadCollectionTask) execute(ctx context.Context) error { defer func() { lct.retryCount-- }() @@ -439,23 +437,23 @@ func (lct *LoadCollectionTask) execute(ctx context.Context) error { return nil } -func (lct *LoadCollectionTask) postExecute(ctx context.Context) error { +func (lct *loadCollectionTask) postExecute(ctx context.Context) error { collectionID := lct.CollectionID if lct.result.ErrorCode != commonpb.ErrorCode_Success { lct.childTasks = []task{} err := lct.meta.releaseCollection(collectionID) if err != nil { - log.Error("LoadCollectionTask: occur error when release collection info from meta", zap.Error(err)) + log.Error("loadCollectionTask: occur error when release collection info from meta", zap.Error(err)) } } - log.Debug("LoadCollectionTask postExecute done", + log.Debug("loadCollectionTask postExecute done", zap.Int64("msgID", lct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } -func (lct *LoadCollectionTask) rollBack(ctx context.Context) []task { +func (lct *loadCollectionTask) rollBack(ctx context.Context) []task { nodes, _ := lct.cluster.onlineNodes() resultTasks := make([]task, 0) //TODO::call rootCoord.ReleaseDQLMessageStream @@ -474,8 +472,8 @@ func (lct *LoadCollectionTask) rollBack(ctx context.Context) []task { } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.setParentTask(lct) - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: req, cluster: lct.cluster, } @@ -485,41 +483,41 @@ func (lct *LoadCollectionTask) rollBack(ctx context.Context) []task { return resultTasks } -// ReleaseCollectionTask will release all the data of this collection on query nodes -type ReleaseCollectionTask struct { - *BaseTask +// releaseCollectionTask will release all the data of this collection on query nodes +type releaseCollectionTask struct { + *baseTask *querypb.ReleaseCollectionRequest cluster Cluster meta Meta rootCoord types.RootCoord } -func (rct *ReleaseCollectionTask) msgBase() *commonpb.MsgBase { +func (rct *releaseCollectionTask) msgBase() *commonpb.MsgBase { return rct.Base } -func (rct *ReleaseCollectionTask) marshal() ([]byte, error) { +func (rct *releaseCollectionTask) marshal() ([]byte, error) { return proto.Marshal(rct.ReleaseCollectionRequest) } -func (rct *ReleaseCollectionTask) msgType() commonpb.MsgType { +func (rct *releaseCollectionTask) msgType() commonpb.MsgType { return rct.Base.MsgType } -func (rct *ReleaseCollectionTask) timestamp() Timestamp { +func (rct *releaseCollectionTask) timestamp() Timestamp { return rct.Base.Timestamp } -func (rct *ReleaseCollectionTask) preExecute(context.Context) error { +func (rct *releaseCollectionTask) preExecute(context.Context) error { collectionID := rct.CollectionID rct.setResultInfo(nil) - log.Debug("start do ReleaseCollectionTask", + log.Debug("start do releaseCollectionTask", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } -func (rct *ReleaseCollectionTask) execute(ctx context.Context) error { +func (rct *releaseCollectionTask) execute(ctx context.Context) error { defer func() { rct.retryCount-- }() @@ -540,7 +538,7 @@ func (rct *ReleaseCollectionTask) execute(ctx context.Context) error { } res, err := rct.rootCoord.ReleaseDQLMessageStream(rct.ctx, releaseDQLMessageStreamReq) if res.ErrorCode != commonpb.ErrorCode_Success || err != nil { - log.Warn("ReleaseCollectionTask: release collection end, releaseDQLMessageStream occur error", zap.Int64("collectionID", rct.CollectionID)) + log.Warn("releaseCollectionTask: release collection end, releaseDQLMessageStream occur error", zap.Int64("collectionID", rct.CollectionID)) err = errors.New("rootCoord releaseDQLMessageStream failed") rct.setResultInfo(err) return err @@ -555,54 +553,54 @@ func (rct *ReleaseCollectionTask) execute(ctx context.Context) error { req.NodeID = nodeID baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.setParentTask(rct) - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: req, cluster: rct.cluster, } rct.addChildTask(releaseCollectionTask) - log.Debug("ReleaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) + log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask)) } } else { err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest) if err != nil { - log.Warn("ReleaseCollectionTask: release collection end, node occur error", zap.Int64("nodeID", rct.NodeID)) + log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("nodeID", rct.NodeID)) rct.setResultInfo(err) return err } } - log.Debug("ReleaseCollectionTask Execute done", + log.Debug("releaseCollectionTask Execute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil } -func (rct *ReleaseCollectionTask) postExecute(context.Context) error { +func (rct *releaseCollectionTask) postExecute(context.Context) error { collectionID := rct.CollectionID if rct.result.ErrorCode != commonpb.ErrorCode_Success { rct.childTasks = []task{} } - log.Debug("ReleaseCollectionTask postExecute done", + log.Debug("releaseCollectionTask postExecute done", zap.Int64("msgID", rct.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID)) return nil } -func (rct *ReleaseCollectionTask) rollBack(ctx context.Context) []task { +func (rct *releaseCollectionTask) rollBack(ctx context.Context) []task { //TODO:: //if taskID == 0, recovery meta //if taskID != 0, recovery collection on queryNode return nil } -// LoadPartitionTask will load all the data of this partition to query nodes -type LoadPartitionTask struct { - *BaseTask +// loadPartitionTask will load all the data of this partition to query nodes +type loadPartitionTask struct { + *baseTask *querypb.LoadPartitionsRequest dataCoord types.DataCoord cluster Cluster @@ -610,23 +608,23 @@ type LoadPartitionTask struct { addCol bool } -func (lpt *LoadPartitionTask) msgBase() *commonpb.MsgBase { +func (lpt *loadPartitionTask) msgBase() *commonpb.MsgBase { return lpt.Base } -func (lpt *LoadPartitionTask) marshal() ([]byte, error) { +func (lpt *loadPartitionTask) marshal() ([]byte, error) { return proto.Marshal(lpt.LoadPartitionsRequest) } -func (lpt *LoadPartitionTask) msgType() commonpb.MsgType { +func (lpt *loadPartitionTask) msgType() commonpb.MsgType { return lpt.Base.MsgType } -func (lpt *LoadPartitionTask) timestamp() Timestamp { +func (lpt *loadPartitionTask) timestamp() Timestamp { return lpt.Base.Timestamp } -func (lpt *LoadPartitionTask) updateTaskProcess() { +func (lpt *loadPartitionTask) updateTaskProcess() { collectionID := lpt.CollectionID partitionIDs := lpt.PartitionIDs childTasks := lpt.getChildTask() @@ -647,16 +645,16 @@ func (lpt *LoadPartitionTask) updateTaskProcess() { } } -func (lpt *LoadPartitionTask) preExecute(context.Context) error { +func (lpt *loadPartitionTask) preExecute(context.Context) error { collectionID := lpt.CollectionID lpt.setResultInfo(nil) - log.Debug("start do LoadPartitionTask", + log.Debug("start do loadPartitionTask", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID)) return nil } -func (lpt *LoadPartitionTask) execute(ctx context.Context) error { +func (lpt *loadPartitionTask) execute(ctx context.Context) error { defer func() { lpt.retryCount-- }() @@ -722,25 +720,25 @@ func (lpt *LoadPartitionTask) execute(ctx context.Context) error { } channelsToWatch = append(channelsToWatch, channel) watchDmReqs = append(watchDmReqs, watchDmRequest) - log.Debug("LoadPartitionTask: set watchDmChannelsRequests", zap.Any("request", watchDmRequest), zap.Int64("collectionID", collectionID)) + log.Debug("loadPartitionTask: set watchDmChannelsRequests", zap.Any("request", watchDmRequest), zap.Int64("collectionID", collectionID)) } } err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, false) if err != nil { - log.Warn("LoadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) + log.Warn("loadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) lpt.setResultInfo(err) return err } - log.Debug("LoadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) + log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) - log.Debug("LoadPartitionTask Execute done", + log.Debug("loadPartitionTask Execute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil } -func (lpt *LoadPartitionTask) postExecute(ctx context.Context) error { +func (lpt *loadPartitionTask) postExecute(ctx context.Context) error { collectionID := lpt.CollectionID partitionIDs := lpt.PartitionIDs if lpt.result.ErrorCode != commonpb.ErrorCode_Success { @@ -748,26 +746,26 @@ func (lpt *LoadPartitionTask) postExecute(ctx context.Context) error { if lpt.addCol { err := lpt.meta.releaseCollection(collectionID) if err != nil { - log.Error("LoadPartitionTask: occur error when release collection info from meta", zap.Error(err)) + log.Error("loadPartitionTask: occur error when release collection info from meta", zap.Error(err)) } } else { for _, partitionID := range partitionIDs { err := lpt.meta.releasePartition(collectionID, partitionID) if err != nil { - log.Error("LoadPartitionTask: occur error when release partition info from meta", zap.Error(err)) + log.Error("loadPartitionTask: occur error when release partition info from meta", zap.Error(err)) } } } } - log.Debug("LoadPartitionTask postExecute done", + log.Debug("loadPartitionTask postExecute done", zap.Int64("msgID", lpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs)) return nil } -func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task { +func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task { partitionIDs := lpt.PartitionIDs resultTasks := make([]task, 0) //brute force rollBack, should optimize @@ -787,8 +785,8 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task { } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.setParentTask(lpt) - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: req, cluster: lpt.cluster, } @@ -812,8 +810,8 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task { baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.setParentTask(lpt) - releasePartitionTask := &ReleasePartitionTask{ - BaseTask: baseTask, + releasePartitionTask := &releasePartitionTask{ + baseTask: baseTask, ReleasePartitionsRequest: req, cluster: lpt.cluster, } @@ -824,30 +822,30 @@ func (lpt *LoadPartitionTask) rollBack(ctx context.Context) []task { return resultTasks } -// ReleasePartitionTask will release all the data of this partition on query nodes -type ReleasePartitionTask struct { - *BaseTask +// releasePartitionTask will release all the data of this partition on query nodes +type releasePartitionTask struct { + *baseTask *querypb.ReleasePartitionsRequest cluster Cluster } -func (rpt *ReleasePartitionTask) msgBase() *commonpb.MsgBase { +func (rpt *releasePartitionTask) msgBase() *commonpb.MsgBase { return rpt.Base } -func (rpt *ReleasePartitionTask) marshal() ([]byte, error) { +func (rpt *releasePartitionTask) marshal() ([]byte, error) { return proto.Marshal(rpt.ReleasePartitionsRequest) } -func (rpt *ReleasePartitionTask) msgType() commonpb.MsgType { +func (rpt *releasePartitionTask) msgType() commonpb.MsgType { return rpt.Base.MsgType } -func (rpt *ReleasePartitionTask) timestamp() Timestamp { +func (rpt *releasePartitionTask) timestamp() Timestamp { return rpt.Base.Timestamp } -func (rpt *ReleasePartitionTask) preExecute(context.Context) error { +func (rpt *releasePartitionTask) preExecute(context.Context) error { collectionID := rpt.CollectionID rpt.setResultInfo(nil) log.Debug("start do releasePartitionTask", @@ -856,7 +854,7 @@ func (rpt *ReleasePartitionTask) preExecute(context.Context) error { return nil } -func (rpt *ReleasePartitionTask) execute(ctx context.Context) error { +func (rpt *releasePartitionTask) execute(ctx context.Context) error { defer func() { rpt.retryCount-- }() @@ -874,13 +872,13 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error { req.NodeID = nodeID baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.setParentTask(rpt) - releasePartitionTask := &ReleasePartitionTask{ - BaseTask: baseTask, + releasePartitionTask := &releasePartitionTask{ + baseTask: baseTask, ReleasePartitionsRequest: req, cluster: rpt.cluster, } rpt.addChildTask(releasePartitionTask) - log.Debug("ReleasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Any("task", releasePartitionTask)) + log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Any("task", releasePartitionTask)) } } else { err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest) @@ -891,7 +889,7 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error { } } - log.Debug("ReleasePartitionTask Execute done", + log.Debug("releasePartitionTask Execute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -899,14 +897,14 @@ func (rpt *ReleasePartitionTask) execute(ctx context.Context) error { return nil } -func (rpt *ReleasePartitionTask) postExecute(context.Context) error { +func (rpt *releasePartitionTask) postExecute(context.Context) error { collectionID := rpt.CollectionID partitionIDs := rpt.PartitionIDs if rpt.result.ErrorCode != commonpb.ErrorCode_Success { rpt.childTasks = []task{} } - log.Debug("ReleasePartitionTask postExecute done", + log.Debug("releasePartitionTask postExecute done", zap.Int64("msgID", rpt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), @@ -914,31 +912,30 @@ func (rpt *ReleasePartitionTask) postExecute(context.Context) error { return nil } -func (rpt *ReleasePartitionTask) rollBack(ctx context.Context) []task { +func (rpt *releasePartitionTask) rollBack(ctx context.Context) []task { //TODO:: //if taskID == 0, recovery meta //if taskID != 0, recovery partition on queryNode return nil } -//****************************internal task*******************************// -type LoadSegmentTask struct { - *BaseTask +type loadSegmentTask struct { + *baseTask *querypb.LoadSegmentsRequest meta Meta cluster Cluster excludeNodeIDs []int64 } -func (lst *LoadSegmentTask) msgBase() *commonpb.MsgBase { +func (lst *loadSegmentTask) msgBase() *commonpb.MsgBase { return lst.Base } -func (lst *LoadSegmentTask) marshal() ([]byte, error) { +func (lst *loadSegmentTask) marshal() ([]byte, error) { return proto.Marshal(lst.LoadSegmentsRequest) } -func (lst *LoadSegmentTask) isValid() bool { +func (lst *loadSegmentTask) isValid() bool { online, err := lst.cluster.isOnline(lst.NodeID) if err != nil { return false @@ -947,24 +944,24 @@ func (lst *LoadSegmentTask) isValid() bool { return lst.ctx != nil && online } -func (lst *LoadSegmentTask) msgType() commonpb.MsgType { +func (lst *loadSegmentTask) msgType() commonpb.MsgType { return lst.Base.MsgType } -func (lst *LoadSegmentTask) timestamp() Timestamp { +func (lst *loadSegmentTask) timestamp() Timestamp { return lst.Base.Timestamp } -func (lst *LoadSegmentTask) updateTaskProcess() { +func (lst *loadSegmentTask) updateTaskProcess() { parentTask := lst.getParentTask() if parentTask == nil { - log.Warn("LoadSegmentTask: parentTask should not be nil") + log.Warn("loadSegmentTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } -func (lst *LoadSegmentTask) preExecute(context.Context) error { +func (lst *loadSegmentTask) preExecute(context.Context) error { segmentIDs := make([]UniqueID, 0) for _, info := range lst.Infos { segmentIDs = append(segmentIDs, info.SegmentID) @@ -977,14 +974,14 @@ func (lst *LoadSegmentTask) preExecute(context.Context) error { return nil } -func (lst *LoadSegmentTask) execute(ctx context.Context) error { +func (lst *loadSegmentTask) execute(ctx context.Context) error { defer func() { lst.retryCount-- }() err := lst.cluster.loadSegments(ctx, lst.NodeID, lst.LoadSegmentsRequest) if err != nil { - log.Warn("LoadSegmentTask: loadSegment occur error", zap.Int64("taskID", lst.getTaskID())) + log.Warn("loadSegmentTask: loadSegment occur error", zap.Int64("taskID", lst.getTaskID())) lst.setResultInfo(err) return err } @@ -994,13 +991,13 @@ func (lst *LoadSegmentTask) execute(ctx context.Context) error { return nil } -func (lst *LoadSegmentTask) postExecute(context.Context) error { +func (lst *loadSegmentTask) postExecute(context.Context) error { log.Debug("loadSegmentTask postExecute done", zap.Int64("taskID", lst.getTaskID())) return nil } -func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) { +func (lst *loadSegmentTask) reschedule(ctx context.Context) ([]task, error) { segmentIDs := make([]UniqueID, 0) collectionID := lst.Infos[0].CollectionID reScheduledTask := make([]task, 0) @@ -1025,8 +1022,8 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) { for nodeID, infos := range node2segmentInfos { loadSegmentBaseTask := newBaseTask(ctx, lst.getTriggerCondition()) loadSegmentBaseTask.setParentTask(lst.getParentTask()) - loadSegmentTask := &LoadSegmentTask{ - BaseTask: loadSegmentBaseTask, + loadSegmentTask := &loadSegmentTask{ + baseTask: loadSegmentBaseTask, LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ Base: lst.Base, NodeID: nodeID, @@ -1039,7 +1036,7 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) { excludeNodeIDs: lst.excludeNodeIDs, } reScheduledTask = append(reScheduledTask, loadSegmentTask) - log.Debug("LoadSegmentTask: add a loadSegmentTask to RescheduleTasks", zap.Any("task", loadSegmentTask)) + log.Debug("loadSegmentTask: add a loadSegmentTask to RescheduleTasks", zap.Any("task", loadSegmentTask)) hasWatchQueryChannel := lst.cluster.hasWatchedQueryChannel(lst.ctx, nodeID, collectionID) if !hasWatchQueryChannel { @@ -1059,34 +1056,34 @@ func (lst *LoadSegmentTask) reschedule(ctx context.Context) ([]task, error) { } watchQueryChannelBaseTask := newBaseTask(ctx, lst.getTriggerCondition()) watchQueryChannelBaseTask.setParentTask(lst.getParentTask()) - watchQueryChannelTask := &WatchQueryChannelTask{ - BaseTask: watchQueryChannelBaseTask, + watchQueryChannelTask := &watchQueryChannelTask{ + baseTask: watchQueryChannelBaseTask, AddQueryChannelRequest: addQueryChannelRequest, cluster: lst.cluster, } reScheduledTask = append(reScheduledTask, watchQueryChannelTask) - log.Debug("LoadSegmentTask: add a watchQueryChannelTask to RescheduleTasks", zap.Any("task", watchQueryChannelTask)) + log.Debug("loadSegmentTask: add a watchQueryChannelTask to RescheduleTasks", zap.Any("task", watchQueryChannelTask)) } } return reScheduledTask, nil } -type ReleaseSegmentTask struct { - *BaseTask +type releaseSegmentTask struct { + *baseTask *querypb.ReleaseSegmentsRequest cluster Cluster } -func (rst *ReleaseSegmentTask) msgBase() *commonpb.MsgBase { +func (rst *releaseSegmentTask) msgBase() *commonpb.MsgBase { return rst.Base } -func (rst *ReleaseSegmentTask) marshal() ([]byte, error) { +func (rst *releaseSegmentTask) marshal() ([]byte, error) { return proto.Marshal(rst.ReleaseSegmentsRequest) } -func (rst *ReleaseSegmentTask) isValid() bool { +func (rst *releaseSegmentTask) isValid() bool { online, err := rst.cluster.isOnline(rst.NodeID) if err != nil { return false @@ -1094,15 +1091,15 @@ func (rst *ReleaseSegmentTask) isValid() bool { return rst.ctx != nil && online } -func (rst *ReleaseSegmentTask) msgType() commonpb.MsgType { +func (rst *releaseSegmentTask) msgType() commonpb.MsgType { return rst.Base.MsgType } -func (rst *ReleaseSegmentTask) timestamp() Timestamp { +func (rst *releaseSegmentTask) timestamp() Timestamp { return rst.Base.Timestamp } -func (rst *ReleaseSegmentTask) preExecute(context.Context) error { +func (rst *releaseSegmentTask) preExecute(context.Context) error { segmentIDs := rst.SegmentIDs rst.setResultInfo(nil) log.Debug("start do releaseSegmentTask", @@ -1112,14 +1109,14 @@ func (rst *ReleaseSegmentTask) preExecute(context.Context) error { return nil } -func (rst *ReleaseSegmentTask) execute(ctx context.Context) error { +func (rst *releaseSegmentTask) execute(ctx context.Context) error { defer func() { rst.retryCount-- }() err := rst.cluster.releaseSegments(rst.ctx, rst.NodeID, rst.ReleaseSegmentsRequest) if err != nil { - log.Warn("ReleaseSegmentTask: releaseSegment occur error", zap.Int64("taskID", rst.getTaskID())) + log.Warn("releaseSegmentTask: releaseSegment occur error", zap.Int64("taskID", rst.getTaskID())) rst.setResultInfo(err) return err } @@ -1130,7 +1127,7 @@ func (rst *ReleaseSegmentTask) execute(ctx context.Context) error { return nil } -func (rst *ReleaseSegmentTask) postExecute(context.Context) error { +func (rst *releaseSegmentTask) postExecute(context.Context) error { segmentIDs := rst.SegmentIDs log.Debug("releaseSegmentTask postExecute done", zap.Int64s("segmentIDs", segmentIDs), @@ -1138,23 +1135,23 @@ func (rst *ReleaseSegmentTask) postExecute(context.Context) error { return nil } -type WatchDmChannelTask struct { - *BaseTask +type watchDmChannelTask struct { + *baseTask *querypb.WatchDmChannelsRequest meta Meta cluster Cluster excludeNodeIDs []int64 } -func (wdt *WatchDmChannelTask) msgBase() *commonpb.MsgBase { +func (wdt *watchDmChannelTask) msgBase() *commonpb.MsgBase { return wdt.Base } -func (wdt *WatchDmChannelTask) marshal() ([]byte, error) { +func (wdt *watchDmChannelTask) marshal() ([]byte, error) { return proto.Marshal(wdt.WatchDmChannelsRequest) } -func (wdt *WatchDmChannelTask) isValid() bool { +func (wdt *watchDmChannelTask) isValid() bool { online, err := wdt.cluster.isOnline(wdt.NodeID) if err != nil { return false @@ -1162,24 +1159,24 @@ func (wdt *WatchDmChannelTask) isValid() bool { return wdt.ctx != nil && online } -func (wdt *WatchDmChannelTask) msgType() commonpb.MsgType { +func (wdt *watchDmChannelTask) msgType() commonpb.MsgType { return wdt.Base.MsgType } -func (wdt *WatchDmChannelTask) timestamp() Timestamp { +func (wdt *watchDmChannelTask) timestamp() Timestamp { return wdt.Base.Timestamp } -func (wdt *WatchDmChannelTask) updateTaskProcess() { +func (wdt *watchDmChannelTask) updateTaskProcess() { parentTask := wdt.getParentTask() if parentTask == nil { - log.Warn("WatchDmChannelTask: parentTask should not be nil") + log.Warn("watchDmChannelTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } -func (wdt *WatchDmChannelTask) preExecute(context.Context) error { +func (wdt *watchDmChannelTask) preExecute(context.Context) error { channelInfos := wdt.Infos channels := make([]string, 0) for _, info := range channelInfos { @@ -1193,14 +1190,14 @@ func (wdt *WatchDmChannelTask) preExecute(context.Context) error { return nil } -func (wdt *WatchDmChannelTask) execute(ctx context.Context) error { +func (wdt *watchDmChannelTask) execute(ctx context.Context) error { defer func() { wdt.retryCount-- }() err := wdt.cluster.watchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest) if err != nil { - log.Warn("WatchDmChannelTask: watchDmChannel occur error", zap.Int64("taskID", wdt.getTaskID())) + log.Warn("watchDmChannelTask: watchDmChannel occur error", zap.Int64("taskID", wdt.getTaskID())) wdt.setResultInfo(err) return err } @@ -1210,13 +1207,13 @@ func (wdt *WatchDmChannelTask) execute(ctx context.Context) error { return nil } -func (wdt *WatchDmChannelTask) postExecute(context.Context) error { +func (wdt *watchDmChannelTask) postExecute(context.Context) error { log.Debug("watchDmChannelTask postExecute done", zap.Int64("taskID", wdt.getTaskID())) return nil } -func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { +func (wdt *watchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { collectionID := wdt.CollectionID channelIDs := make([]string, 0) reScheduledTask := make([]task, 0) @@ -1242,8 +1239,8 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { for nodeID, infos := range node2channelInfos { watchDmChannelBaseTask := newBaseTask(ctx, wdt.getTriggerCondition()) watchDmChannelBaseTask.setParentTask(wdt.getParentTask()) - watchDmChannelTask := &WatchDmChannelTask{ - BaseTask: watchDmChannelBaseTask, + watchDmChannelTask := &watchDmChannelTask{ + baseTask: watchDmChannelBaseTask, WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{ Base: wdt.Base, NodeID: nodeID, @@ -1258,7 +1255,7 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { excludeNodeIDs: wdt.excludeNodeIDs, } reScheduledTask = append(reScheduledTask, watchDmChannelTask) - log.Debug("WatchDmChannelTask: add a watchDmChannelTask to RescheduleTasks", zap.Any("task", watchDmChannelTask)) + log.Debug("watchDmChannelTask: add a watchDmChannelTask to RescheduleTasks", zap.Any("task", watchDmChannelTask)) hasWatchQueryChannel := wdt.cluster.hasWatchedQueryChannel(wdt.ctx, nodeID, collectionID) if !hasWatchQueryChannel { @@ -1278,34 +1275,34 @@ func (wdt *WatchDmChannelTask) reschedule(ctx context.Context) ([]task, error) { } watchQueryChannelBaseTask := newBaseTask(ctx, wdt.getTriggerCondition()) watchQueryChannelBaseTask.setParentTask(wdt.getParentTask()) - watchQueryChannelTask := &WatchQueryChannelTask{ - BaseTask: watchQueryChannelBaseTask, + watchQueryChannelTask := &watchQueryChannelTask{ + baseTask: watchQueryChannelBaseTask, AddQueryChannelRequest: addQueryChannelRequest, cluster: wdt.cluster, } reScheduledTask = append(reScheduledTask, watchQueryChannelTask) - log.Debug("WatchDmChannelTask: add a watchQueryChannelTask to RescheduleTasks", zap.Any("task", watchQueryChannelTask)) + log.Debug("watchDmChannelTask: add a watchQueryChannelTask to RescheduleTasks", zap.Any("task", watchQueryChannelTask)) } } return reScheduledTask, nil } -type WatchQueryChannelTask struct { - *BaseTask +type watchQueryChannelTask struct { + *baseTask *querypb.AddQueryChannelRequest cluster Cluster } -func (wqt *WatchQueryChannelTask) msgBase() *commonpb.MsgBase { +func (wqt *watchQueryChannelTask) msgBase() *commonpb.MsgBase { return wqt.Base } -func (wqt *WatchQueryChannelTask) marshal() ([]byte, error) { +func (wqt *watchQueryChannelTask) marshal() ([]byte, error) { return proto.Marshal(wqt.AddQueryChannelRequest) } -func (wqt *WatchQueryChannelTask) isValid() bool { +func (wqt *watchQueryChannelTask) isValid() bool { online, err := wqt.cluster.isOnline(wqt.NodeID) if err != nil { return false @@ -1314,26 +1311,26 @@ func (wqt *WatchQueryChannelTask) isValid() bool { return wqt.ctx != nil && online } -func (wqt *WatchQueryChannelTask) msgType() commonpb.MsgType { +func (wqt *watchQueryChannelTask) msgType() commonpb.MsgType { return wqt.Base.MsgType } -func (wqt *WatchQueryChannelTask) timestamp() Timestamp { +func (wqt *watchQueryChannelTask) timestamp() Timestamp { return wqt.Base.Timestamp } -func (wqt *WatchQueryChannelTask) updateTaskProcess() { +func (wqt *watchQueryChannelTask) updateTaskProcess() { parentTask := wqt.getParentTask() if parentTask == nil { - log.Warn("WatchQueryChannelTask: parentTask should not be nil") + log.Warn("watchQueryChannelTask: parentTask should not be nil") return } parentTask.updateTaskProcess() } -func (wqt *WatchQueryChannelTask) preExecute(context.Context) error { +func (wqt *watchQueryChannelTask) preExecute(context.Context) error { wqt.setResultInfo(nil) - log.Debug("start do WatchQueryChannelTask", + log.Debug("start do watchQueryChannelTask", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.RequestChannelID), zap.String("queryResultChannel", wqt.ResultChannelID), @@ -1342,14 +1339,14 @@ func (wqt *WatchQueryChannelTask) preExecute(context.Context) error { return nil } -func (wqt *WatchQueryChannelTask) execute(ctx context.Context) error { +func (wqt *watchQueryChannelTask) execute(ctx context.Context) error { defer func() { wqt.retryCount-- }() err := wqt.cluster.addQueryChannel(wqt.ctx, wqt.NodeID, wqt.AddQueryChannelRequest) if err != nil { - log.Warn("WatchQueryChannelTask: watchQueryChannel occur error", zap.Int64("taskID", wqt.getTaskID())) + log.Warn("watchQueryChannelTask: watchQueryChannel occur error", zap.Int64("taskID", wqt.getTaskID())) wqt.setResultInfo(err) return err } @@ -1362,8 +1359,8 @@ func (wqt *WatchQueryChannelTask) execute(ctx context.Context) error { return nil } -func (wqt *WatchQueryChannelTask) postExecute(context.Context) error { - log.Debug("WatchQueryChannelTask postExecute done", +func (wqt *watchQueryChannelTask) postExecute(context.Context) error { + log.Debug("watchQueryChannelTask postExecute done", zap.Int64("collectionID", wqt.CollectionID), zap.String("queryChannel", wqt.RequestChannelID), zap.String("queryResultChannel", wqt.ResultChannelID), @@ -1371,13 +1368,11 @@ func (wqt *WatchQueryChannelTask) postExecute(context.Context) error { return nil } -//****************************handoff task********************************// -type HandoffTask struct { +type handoffTask struct { } -//*********************** ***load balance task*** ************************// -type LoadBalanceTask struct { - *BaseTask +type loadBalanceTask struct { + *baseTask *querypb.LoadBalanceRequest rootCoord types.RootCoord dataCoord types.DataCoord @@ -1385,32 +1380,32 @@ type LoadBalanceTask struct { meta Meta } -func (lbt *LoadBalanceTask) msgBase() *commonpb.MsgBase { +func (lbt *loadBalanceTask) msgBase() *commonpb.MsgBase { return lbt.Base } -func (lbt *LoadBalanceTask) marshal() ([]byte, error) { +func (lbt *loadBalanceTask) marshal() ([]byte, error) { return proto.Marshal(lbt.LoadBalanceRequest) } -func (lbt *LoadBalanceTask) msgType() commonpb.MsgType { +func (lbt *loadBalanceTask) msgType() commonpb.MsgType { return lbt.Base.MsgType } -func (lbt *LoadBalanceTask) timestamp() Timestamp { +func (lbt *loadBalanceTask) timestamp() Timestamp { return lbt.Base.Timestamp } -func (lbt *LoadBalanceTask) preExecute(context.Context) error { +func (lbt *loadBalanceTask) preExecute(context.Context) error { lbt.setResultInfo(nil) - log.Debug("start do LoadBalanceTask", + log.Debug("start do loadBalanceTask", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) return nil } -func (lbt *LoadBalanceTask) execute(ctx context.Context) error { +func (lbt *loadBalanceTask) execute(ctx context.Context) error { defer func() { lbt.retryCount-- }() @@ -1422,7 +1417,7 @@ func (lbt *LoadBalanceTask) execute(ctx context.Context) error { collectionID := info.CollectionID metaInfo, err := lbt.meta.getCollectionInfoByID(collectionID) if err != nil { - log.Warn("LoadBalanceTask: getCollectionInfoByID occur error", zap.String("error", err.Error())) + log.Warn("loadBalanceTask: getCollectionInfoByID occur error", zap.String("error", err.Error())) lbt.setResultInfo(err) return err } @@ -1538,30 +1533,30 @@ func (lbt *LoadBalanceTask) execute(ctx context.Context) error { // return nil //} - log.Debug("LoadBalanceTask Execute done", + log.Debug("loadBalanceTask Execute done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) return nil } -func (lbt *LoadBalanceTask) postExecute(context.Context) error { +func (lbt *loadBalanceTask) postExecute(context.Context) error { if lbt.result.ErrorCode == commonpb.ErrorCode_Success { for _, id := range lbt.SourceNodeIDs { err := lbt.cluster.removeNodeInfo(id) if err != nil { - log.Error("LoadBalanceTask: occur error when removing node info from cluster", zap.Int64("nodeID", id)) + log.Error("loadBalanceTask: occur error when removing node info from cluster", zap.Int64("nodeID", id)) } err = lbt.meta.deleteSegmentInfoByNodeID(id) if err != nil { - log.Error("LoadBalanceTask: occur error when removing node info from meta", zap.Int64("nodeID", id)) + log.Error("loadBalanceTask: occur error when removing node info from meta", zap.Int64("nodeID", id)) } } } else { lbt.childTasks = []task{} } - log.Debug("LoadBalanceTask postExecute done", + log.Debug("loadBalanceTask postExecute done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), zap.Int64("taskID", lbt.getTaskID())) @@ -1792,8 +1787,8 @@ func assignInternalTask(ctx context.Context, loadSegmentsReq.NodeID = nodeID baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) - loadSegmentTask := &LoadSegmentTask{ - BaseTask: baseTask, + loadSegmentTask := &loadSegmentTask{ + baseTask: baseTask, LoadSegmentsRequest: loadSegmentsReq, meta: meta, cluster: cluster, @@ -1809,8 +1804,8 @@ func assignInternalTask(ctx context.Context, watchDmChannelReq.NodeID = nodeID baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) - watchDmChannelTask := &WatchDmChannelTask{ - BaseTask: baseTask, + watchDmChannelTask := &watchDmChannelTask{ + baseTask: baseTask, WatchDmChannelsRequest: watchDmChannelReq, meta: meta, cluster: cluster, @@ -1839,8 +1834,8 @@ func assignInternalTask(ctx context.Context, } baseTask := newBaseTask(ctx, parentTask.getTriggerCondition()) baseTask.setParentTask(parentTask) - watchQueryChannelTask := &WatchQueryChannelTask{ - BaseTask: baseTask, + watchQueryChannelTask := &watchQueryChannelTask{ + baseTask: baseTask, AddQueryChannelRequest: addQueryChannelRequest, cluster: cluster, diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 25d7cefea..12a1ef3b5 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -272,8 +272,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - loadCollectionTask := &LoadCollectionTask{ - BaseTask: baseTask, + loadCollectionTask := &loadCollectionTask{ + baseTask: baseTask, LoadCollectionRequest: &loadReq, rootCoord: scheduler.rootCoord, dataCoord: scheduler.dataCoord, @@ -287,8 +287,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - loadPartitionTask := &LoadPartitionTask{ - BaseTask: baseTask, + loadPartitionTask := &loadPartitionTask{ + baseTask: baseTask, LoadPartitionsRequest: &loadReq, dataCoord: scheduler.dataCoord, cluster: scheduler.cluster, @@ -301,8 +301,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: &loadReq, cluster: scheduler.cluster, meta: scheduler.meta, @@ -315,8 +315,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - releasePartitionTask := &ReleasePartitionTask{ - BaseTask: baseTask, + releasePartitionTask := &releasePartitionTask{ + baseTask: baseTask, ReleasePartitionsRequest: &loadReq, cluster: scheduler.cluster, } @@ -328,8 +328,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - loadSegmentTask := &LoadSegmentTask{ - BaseTask: baseTask, + loadSegmentTask := &loadSegmentTask{ + baseTask: baseTask, LoadSegmentsRequest: &loadReq, cluster: scheduler.cluster, meta: scheduler.meta, @@ -343,8 +343,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - releaseSegmentTask := &ReleaseSegmentTask{ - BaseTask: baseTask, + releaseSegmentTask := &releaseSegmentTask{ + baseTask: baseTask, ReleaseSegmentsRequest: &loadReq, cluster: scheduler.cluster, } @@ -356,8 +356,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - watchDmChannelTask := &WatchDmChannelTask{ - BaseTask: baseTask, + watchDmChannelTask := &watchDmChannelTask{ + baseTask: baseTask, WatchDmChannelsRequest: &loadReq, cluster: scheduler.cluster, meta: scheduler.meta, @@ -371,8 +371,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - watchQueryChannelTask := &WatchQueryChannelTask{ - BaseTask: baseTask, + watchQueryChannelTask := &watchQueryChannelTask{ + baseTask: baseTask, AddQueryChannelRequest: &loadReq, cluster: scheduler.cluster, } @@ -384,8 +384,8 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task, if err != nil { return nil, err } - loadBalanceTask := &LoadBalanceTask{ - BaseTask: baseTask, + loadBalanceTask := &loadBalanceTask{ + baseTask: baseTask, LoadBalanceRequest: &loadReq, rootCoord: scheduler.rootCoord, dataCoord: scheduler.dataCoord, diff --git a/internal/querycoord/task_scheduler_test.go b/internal/querycoord/task_scheduler_test.go index 4cbf4c708..51ca57566 100644 --- a/internal/querycoord/task_scheduler_test.go +++ b/internal/querycoord/task_scheduler_test.go @@ -25,7 +25,7 @@ import ( ) type testTask struct { - BaseTask + baseTask baseMsg *commonpb.MsgBase cluster Cluster meta Meta @@ -59,8 +59,8 @@ func (tt *testTask) execute(ctx context.Context) error { switch tt.baseMsg.MsgType { case commonpb.MsgType_LoadSegments: - childTask := &LoadSegmentTask{ - BaseTask: &BaseTask{ + childTask := &loadSegmentTask{ + baseTask: &baseTask{ ctx: tt.ctx, Condition: NewTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, @@ -77,8 +77,8 @@ func (tt *testTask) execute(ctx context.Context) error { } tt.addChildTask(childTask) case commonpb.MsgType_WatchDmChannels: - childTask := &WatchDmChannelTask{ - BaseTask: &BaseTask{ + childTask := &watchDmChannelTask{ + baseTask: &baseTask{ ctx: tt.ctx, Condition: NewTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, @@ -95,8 +95,8 @@ func (tt *testTask) execute(ctx context.Context) error { } tt.addChildTask(childTask) case commonpb.MsgType_WatchQueryChannels: - childTask := &WatchQueryChannelTask{ - BaseTask: &BaseTask{ + childTask := &watchQueryChannelTask{ + baseTask: &baseTask{ ctx: tt.ctx, Condition: NewTaskCondition(tt.ctx), triggerCondition: tt.triggerCondition, @@ -134,7 +134,7 @@ func TestWatchQueryChannel_ClearEtcdInfoAfterAssignedNodeDown(t *testing.T) { nodeID := queryNode.queryNodeID waitQueryNodeOnline(queryCoord.cluster, nodeID) testTask := &testTask{ - BaseTask: BaseTask{ + baseTask: baseTask{ ctx: baseCtx, Condition: NewTaskCondition(baseCtx), triggerCondition: querypb.TriggerCondition_grpcRequest, @@ -173,8 +173,8 @@ func TestUnMarshalTask(t *testing.T) { cancel: cancel, } - t.Run("Test LoadCollectionTask", func(t *testing.T) { - loadTask := &LoadCollectionTask{ + t.Run("Test loadCollectionTask", func(t *testing.T) { + loadTask := &loadCollectionTask{ LoadCollectionRequest: &querypb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadCollection, @@ -195,7 +195,7 @@ func TestUnMarshalTask(t *testing.T) { }) t.Run("Test LoadPartitionsTask", func(t *testing.T) { - loadTask := &LoadPartitionTask{ + loadTask := &loadPartitionTask{ LoadPartitionsRequest: &querypb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadPartitions, @@ -215,8 +215,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_LoadPartitions) }) - t.Run("Test ReleaseCollectionTask", func(t *testing.T) { - releaseTask := &ReleaseCollectionTask{ + t.Run("Test releaseCollectionTask", func(t *testing.T) { + releaseTask := &releaseCollectionTask{ ReleaseCollectionRequest: &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -236,8 +236,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_ReleaseCollection) }) - t.Run("Test ReleasePartitionTask", func(t *testing.T) { - releaseTask := &ReleasePartitionTask{ + t.Run("Test releasePartitionTask", func(t *testing.T) { + releaseTask := &releasePartitionTask{ ReleasePartitionsRequest: &querypb.ReleasePartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleasePartitions, @@ -257,8 +257,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_ReleasePartitions) }) - t.Run("Test LoadSegmentTask", func(t *testing.T) { - loadTask := &LoadSegmentTask{ + t.Run("Test loadSegmentTask", func(t *testing.T) { + loadTask := &loadSegmentTask{ LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadSegments, @@ -278,8 +278,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_LoadSegments) }) - t.Run("Test ReleaseSegmentTask", func(t *testing.T) { - releaseTask := &ReleaseSegmentTask{ + t.Run("Test releaseSegmentTask", func(t *testing.T) { + releaseTask := &releaseSegmentTask{ ReleaseSegmentsRequest: &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseSegments, @@ -299,8 +299,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_ReleaseSegments) }) - t.Run("Test WatchDmChannelTask", func(t *testing.T) { - watchTask := &WatchDmChannelTask{ + t.Run("Test watchDmChannelTask", func(t *testing.T) { + watchTask := &watchDmChannelTask{ WatchDmChannelsRequest: &querypb.WatchDmChannelsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchDmChannels, @@ -320,8 +320,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_WatchDmChannels) }) - t.Run("Test WatchQueryChannelTask", func(t *testing.T) { - watchTask := &WatchQueryChannelTask{ + t.Run("Test watchQueryChannelTask", func(t *testing.T) { + watchTask := &watchQueryChannelTask{ AddQueryChannelRequest: &querypb.AddQueryChannelRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_WatchQueryChannels, @@ -341,8 +341,8 @@ func TestUnMarshalTask(t *testing.T) { assert.Equal(t, task.msgType(), commonpb.MsgType_WatchQueryChannels) }) - t.Run("Test LoadBalanceTask", func(t *testing.T) { - loadBalanceTask := &LoadBalanceTask{ + t.Run("Test loadBalanceTask", func(t *testing.T) { + loadBalanceTask := &loadBalanceTask{ LoadBalanceRequest: &querypb.LoadBalanceRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadBalanceSegments, @@ -379,7 +379,7 @@ func TestReloadTaskFromKV(t *testing.T) { } kvs := make(map[string]string) - triggerTask := &LoadCollectionTask{ + triggerTask := &loadCollectionTask{ LoadCollectionRequest: &querypb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ Timestamp: 1, @@ -392,7 +392,7 @@ func TestReloadTaskFromKV(t *testing.T) { triggerTaskKey := fmt.Sprintf("%s/%d", triggerTaskPrefix, 100) kvs[triggerTaskKey] = string(triggerBlobs) - activeTask := &LoadSegmentTask{ + activeTask := &loadSegmentTask{ LoadSegmentsRequest: &querypb.LoadSegmentsRequest{ Base: &commonpb.MsgBase{ Timestamp: 2, diff --git a/internal/querycoord/task_test.go b/internal/querycoord/task_test.go index b449304bf..9399a1de3 100644 --- a/internal/querycoord/task_test.go +++ b/internal/querycoord/task_test.go @@ -21,7 +21,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" ) -func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *LoadCollectionTask { +func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *loadCollectionTask { req := &querypb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadCollection, @@ -30,8 +30,8 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *LoadCol Schema: genCollectionSchema(defaultCollectionID, false), } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) - loadCollectionTask := &LoadCollectionTask{ - BaseTask: baseTask, + loadCollectionTask := &loadCollectionTask{ + baseTask: baseTask, LoadCollectionRequest: req, rootCoord: queryCoord.rootCoordClient, dataCoord: queryCoord.dataCoordClient, @@ -41,7 +41,7 @@ func genLoadCollectionTask(ctx context.Context, queryCoord *QueryCoord) *LoadCol return loadCollectionTask } -func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *LoadPartitionTask { +func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *loadPartitionTask { req := &querypb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadPartitions, @@ -50,8 +50,8 @@ func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *LoadPart PartitionIDs: []UniqueID{defaultPartitionID}, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) - loadPartitionTask := &LoadPartitionTask{ - BaseTask: baseTask, + loadPartitionTask := &loadPartitionTask{ + baseTask: baseTask, LoadPartitionsRequest: req, dataCoord: queryCoord.dataCoordClient, cluster: queryCoord.cluster, @@ -60,7 +60,7 @@ func genLoadPartitionTask(ctx context.Context, queryCoord *QueryCoord) *LoadPart return loadPartitionTask } -func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *ReleaseCollectionTask { +func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *releaseCollectionTask { req := &querypb.ReleaseCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseCollection, @@ -68,8 +68,8 @@ func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *Rele CollectionID: defaultCollectionID, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) - releaseCollectionTask := &ReleaseCollectionTask{ - BaseTask: baseTask, + releaseCollectionTask := &releaseCollectionTask{ + baseTask: baseTask, ReleaseCollectionRequest: req, rootCoord: queryCoord.rootCoordClient, cluster: queryCoord.cluster, @@ -79,7 +79,7 @@ func genReleaseCollectionTask(ctx context.Context, queryCoord *QueryCoord) *Rele return releaseCollectionTask } -func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *ReleasePartitionTask { +func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *releasePartitionTask { req := &querypb.ReleasePartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleasePartitions, @@ -88,8 +88,8 @@ func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *Relea PartitionIDs: []UniqueID{defaultPartitionID}, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) - releasePartitionTask := &ReleasePartitionTask{ - BaseTask: baseTask, + releasePartitionTask := &releasePartitionTask{ + baseTask: baseTask, ReleasePartitionsRequest: req, cluster: queryCoord.cluster, } @@ -97,7 +97,7 @@ func genReleasePartitionTask(ctx context.Context, queryCoord *QueryCoord) *Relea return releasePartitionTask } -func genReleaseSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *ReleaseSegmentTask { +func genReleaseSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *releaseSegmentTask { req := &querypb.ReleaseSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_ReleaseSegments, @@ -108,15 +108,15 @@ func genReleaseSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID i SegmentIDs: []UniqueID{defaultSegmentID}, } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) - releaseSegmentTask := &ReleaseSegmentTask{ - BaseTask: baseTask, + releaseSegmentTask := &releaseSegmentTask{ + baseTask: baseTask, ReleaseSegmentsRequest: req, cluster: queryCoord.cluster, } return releaseSegmentTask } -func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *WatchDmChannelTask { +func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *watchDmChannelTask { schema := genCollectionSchema(defaultCollectionID, false) vChannelInfo := &datapb.VchannelInfo{ CollectionID: defaultCollectionID, @@ -134,8 +134,8 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.taskID = 100 - watchDmChannelTask := &WatchDmChannelTask{ - BaseTask: baseTask, + watchDmChannelTask := &watchDmChannelTask{ + baseTask: baseTask, WatchDmChannelsRequest: req, cluster: queryCoord.cluster, meta: queryCoord.meta, @@ -151,8 +151,8 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i } baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseParentTask.taskID = 10 - parentTask := &LoadCollectionTask{ - BaseTask: baseParentTask, + parentTask := &loadCollectionTask{ + baseTask: baseParentTask, LoadCollectionRequest: parentReq, rootCoord: queryCoord.rootCoordClient, dataCoord: queryCoord.dataCoordClient, @@ -167,7 +167,7 @@ func genWatchDmChannelTask(ctx context.Context, queryCoord *QueryCoord, nodeID i queryCoord.meta.addCollection(defaultCollectionID, schema) return watchDmChannelTask } -func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *LoadSegmentTask { +func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int64) *loadSegmentTask { schema := genCollectionSchema(defaultCollectionID, false) segmentInfo := &querypb.SegmentLoadInfo{ SegmentID: defaultSegmentID, @@ -184,8 +184,8 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6 } baseTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseTask.taskID = 100 - loadSegmentTask := &LoadSegmentTask{ - BaseTask: baseTask, + loadSegmentTask := &loadSegmentTask{ + baseTask: baseTask, LoadSegmentsRequest: req, cluster: queryCoord.cluster, meta: queryCoord.meta, @@ -201,8 +201,8 @@ func genLoadSegmentTask(ctx context.Context, queryCoord *QueryCoord, nodeID int6 } baseParentTask := newBaseTask(ctx, querypb.TriggerCondition_grpcRequest) baseParentTask.taskID = 10 - parentTask := &LoadCollectionTask{ - BaseTask: baseParentTask, + parentTask := &loadCollectionTask{ + baseTask: baseParentTask, LoadCollectionRequest: parentReq, rootCoord: queryCoord.rootCoordClient, dataCoord: queryCoord.dataCoordClient, -- GitLab