From f973456c106731192ce72d69b48f0276127df34c Mon Sep 17 00:00:00 2001 From: xige-16 Date: Sun, 27 Jun 2021 12:16:09 +0800 Subject: [PATCH] filter delEvent when querynode register fail (#6150) Signed-off-by: xige-16 --- internal/querycoord/query_coord.go | 64 +++++++++++++++--------------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/internal/querycoord/query_coord.go b/internal/querycoord/query_coord.go index 31d1bb74b..93f046ef4 100644 --- a/internal/querycoord/query_coord.go +++ b/internal/querycoord/query_coord.go @@ -239,39 +239,41 @@ func (qc *QueryCoord) watchNodeLoop() { }() case sessionutil.SessionDelEvent: serverID := event.Session.ServerID - log.Debug("query coordinator", zap.Any("The QueryNode crashed with ID", serverID)) - qc.cluster.nodes[serverID].setNodeState(false) - qc.cluster.nodes[serverID].client.Stop() - loadBalanceSegment := &querypb.LoadBalanceRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_LoadBalanceSegments, - SourceID: qc.session.ServerID, - }, - SourceNodeIDs: []int64{serverID}, - BalanceReason: querypb.TriggerCondition_nodeDown, - } + if _, ok := qc.cluster.nodes[serverID]; ok { + log.Debug("query coordinator", zap.Any("The QueryNode crashed with ID", serverID)) + qc.cluster.nodes[serverID].setNodeState(false) + qc.cluster.nodes[serverID].client.Stop() + loadBalanceSegment := &querypb.LoadBalanceRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_LoadBalanceSegments, + SourceID: qc.session.ServerID, + }, + SourceNodeIDs: []int64{serverID}, + BalanceReason: querypb.TriggerCondition_nodeDown, + } - loadBalanceTask := &LoadBalanceTask{ - BaseTask: BaseTask{ - ctx: qc.loopCtx, - Condition: NewTaskCondition(qc.loopCtx), - triggerCondition: querypb.TriggerCondition_nodeDown, - }, - LoadBalanceRequest: loadBalanceSegment, - rootCoord: qc.rootCoordClient, - dataCoord: qc.dataCoordClient, - cluster: qc.cluster, - meta: qc.meta, - } - qc.scheduler.Enqueue([]task{loadBalanceTask}) - go func() { - err := loadBalanceTask.WaitToFinish() - if err != nil { - log.Error(err.Error()) + loadBalanceTask := &LoadBalanceTask{ + BaseTask: BaseTask{ + ctx: qc.loopCtx, + Condition: NewTaskCondition(qc.loopCtx), + triggerCondition: querypb.TriggerCondition_nodeDown, + }, + LoadBalanceRequest: loadBalanceSegment, + rootCoord: qc.rootCoordClient, + dataCoord: qc.dataCoordClient, + cluster: qc.cluster, + meta: qc.meta, } - log.Debug("load balance done after queryNode down", zap.Int64s("nodeIDs", loadBalanceTask.SourceNodeIDs)) - //TODO::remove nodeInfo and clear etcd - }() + qc.scheduler.Enqueue([]task{loadBalanceTask}) + go func() { + err := loadBalanceTask.WaitToFinish() + if err != nil { + log.Error(err.Error()) + } + log.Debug("load balance done after queryNode down", zap.Int64s("nodeIDs", loadBalanceTask.SourceNodeIDs)) + //TODO::remove nodeInfo and clear etcd + }() + } } } } -- GitLab