未验证 提交 20936f78 编写于 作者: Y yah01 提交者: GitHub

Fix LoadBalance failed to sync segments to shard leader when the leader is offline (#16770)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 333b439a
......@@ -474,6 +474,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
log.Error("unable to allcoate node", zap.Int64("nodeID", serverID), zap.Error(err))
}
qc.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID))
......
......@@ -78,6 +78,7 @@ type task interface {
preExecute(ctx context.Context) error
execute(ctx context.Context) error
postExecute(ctx context.Context) error
globalPostExecute(ctx context.Context) error // execute after all child task completed
reschedule(ctx context.Context) ([]task, error)
rollBack(ctx context.Context) []task
waitToFinish() error
......@@ -273,6 +274,10 @@ func (bt *baseTask) getResultInfo() *commonpb.Status {
return proto.Clone(bt.result).(*commonpb.Status)
}
func (bt *baseTask) globalPostExecute(ctx context.Context) error {
return nil
}
func (bt *baseTask) updateTaskProcess() {
// TODO::
}
......@@ -2222,13 +2227,6 @@ func (lbt *loadBalanceTask) getReplica(nodeID, collectionID int64) (*milvuspb.Re
}
func (lbt *loadBalanceTask) postExecute(context.Context) error {
err := syncReplicaSegments(lbt.ctx, lbt.cluster, lbt.getChildTask())
if err != nil {
log.Error("loadBalanceTask: failed to sync replica segments to shard leaders",
zap.Int64("taskID", lbt.getTaskID()),
zap.Error(err))
}
if lbt.getResultInfo().ErrorCode != commonpb.ErrorCode_Success {
lbt.clearChildTasks()
}
......@@ -2299,6 +2297,13 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
return nil
}
func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
if len(lbt.getChildTask()) > 0 {
return syncReplicaSegments(ctx, lbt.cluster, lbt.getChildTask())
}
return nil
}
func assignInternalTask(ctx context.Context,
parentTask task, meta Meta, cluster Cluster,
loadSegmentRequests []*querypb.LoadSegmentsRequest,
......
......@@ -684,6 +684,14 @@ func (scheduler *TaskScheduler) scheduleLoop() {
}
}
err = triggerTask.globalPostExecute(triggerTask.traceCtx())
if err != nil {
log.Error("scheduleLoop: failed to execute globalPostExecute() of task",
zap.Int64("taskID", triggerTask.getTaskID()),
zap.Error(err))
triggerTask.setResultInfo(err)
}
err = removeTaskFromKVFn(triggerTask)
if err != nil {
log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册