From 20936f78daada43b17ecab64bc896445e04f2c13 Mon Sep 17 00:00:00 2001 From: yah01 Date: Thu, 5 May 2022 16:25:50 +0800 Subject: [PATCH] Fix LoadBalance failed to sync segments to shard leader when the leader is offline (#16770) Signed-off-by: yah01 --- internal/querycoord/query_coord.go | 1 + internal/querycoord/task.go | 19 ++++++++++++------- internal/querycoord/task_scheduler.go | 8 ++++++++ 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 031644bc3..7c12be21b 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -474,6 +474,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) { log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err)) } qc.metricsCacheManager.InvalidateSystemInfoMetrics() + case sessionutil.SessionDelEvent: serverID := event.Session.ServerID log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID)) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 1c5a2fbcd..6810c6b8b 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -78,6 +78,7 @@ type task interface { preExecute(ctx context.Context) error execute(ctx context.Context) error postExecute(ctx context.Context) error + globalPostExecute(ctx context.Context) error // execute after all child task completed reschedule(ctx context.Context) ([]task, error) rollBack(ctx context.Context) []task waitToFinish() error @@ -273,6 +274,10 @@ func (bt *baseTask) getResultInfo() *commonpb.Status { return proto.Clone(bt.result).(*commonpb.Status) } +func (bt *baseTask) globalPostExecute(ctx context.Context) error { + return nil +} + func (bt *baseTask) updateTaskProcess() { // TODO:: } @@ -2222,13 +2227,6 @@ func (lbt *loadBalanceTask) getReplica(nodeID, collectionID int64) (*milvuspb.Re } func (lbt *loadBalanceTask) postExecute(context.Context) error { - err := syncReplicaSegments(lbt.ctx, lbt.cluster, lbt.getChildTask()) - if err != nil { - log.Error("loadBalanceTask: failed to sync replica segments to shard leaders", - zap.Int64("taskID", lbt.getTaskID()), - zap.Error(err)) - } - if lbt.getResultInfo().ErrorCode != commonpb.ErrorCode_Success { lbt.clearChildTasks() } @@ -2299,6 +2297,13 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error { return nil } +func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { + if len(lbt.getChildTask()) > 0 { + return syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask()) + } + return nil +} + func assignInternalTask(ctx context.Context, parentTask task, meta Meta, cluster Cluster, loadSegmentRequests []*querypb.LoadSegmentsRequest, diff --git a/internal/querycoord/task_scheduler.go b/internal/querycoord/task_scheduler.go index 3a7a2e5fe..5bdd6972e 100644 --- a/internal/querycoord/task_scheduler.go +++ b/internal/querycoord/task_scheduler.go @@ -684,6 +684,14 @@ func (scheduler *TaskScheduler) scheduleLoop() { } } + err = triggerTask.globalPostExecute(triggerTask.traceCtx()) + if err != nil { + log.Error("scheduleLoop: failed to execute globalPostExecute() of task", + zap.Int64("taskID", triggerTask.getTaskID()), + zap.Error(err)) + triggerTask.setResultInfo(err) + } + err = removeTaskFromKVFn(triggerTask) if err != nil { log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err)) -- GitLab