未验证 提交 7d5c8c5f 编写于 作者: Y yah01 提交者: GitHub

Fix bug not remove offline node (#17560)

The LoadBalance task won't remove the offline node if the node never load/watch any segment/dmchannel
Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 a6f0a231
......@@ -303,18 +303,25 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
if err != nil {
return err
}
dmChannelWatchInfo := make([]*querypb.DmChannelWatchInfo, len(in.Infos))
for index, info := range in.Infos {
nodes := []UniqueID{nodeID}
old, ok := c.clusterMeta.getDmChannel(info.ChannelName)
if ok {
nodes = append(nodes, old.NodeIds...)
}
dmChannelWatchInfo[index] = &querypb.DmChannelWatchInfo{
CollectionID: info.CollectionID,
DmChannel: info.ChannelName,
NodeIDLoaded: nodeID,
ReplicaID: in.ReplicaID,
NodeIds: []int64{nodeID},
NodeIds: nodes,
}
}
err = c.clusterMeta.setDmChannelInfos(dmChannelWatchInfo)
err = c.clusterMeta.setDmChannelInfos(dmChannelWatchInfo...)
if err != nil {
// TODO DML channel maybe leaked, need to release dml if no related segment
return err
......@@ -640,8 +647,8 @@ func (c *queryNodeCluster) StopNode(nodeID int64) {
defer c.RUnlock()
if node, ok := c.nodes[nodeID]; ok {
node.stop()
c.setNodeState(nodeID, node, offline)
node.stop()
log.Info("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}
......
......@@ -660,7 +660,7 @@ func TestSetNodeState(t *testing.T) {
NodeIDLoaded: node.queryNodeID,
NodeIds: []int64{node.queryNodeID},
}
err = meta.setDmChannelInfos([]*querypb.DmChannelWatchInfo{dmChannelWatchInfo})
err = meta.setDmChannelInfos(dmChannelWatchInfo)
assert.Nil(t, err)
deltaChannelInfo := &datapb.VchannelInfo{
CollectionID: defaultCollectionID,
......
......@@ -80,8 +80,9 @@ type Meta interface {
getPartitionStatesByID(collectionID UniqueID, partitionID UniqueID) (*querypb.PartitionStates, error)
getDmChannel(dmChannelName string) (*querypb.DmChannelWatchInfo, bool)
getDmChannelInfosByNodeID(nodeID int64) []*querypb.DmChannelWatchInfo
setDmChannelInfos(channelInfos []*querypb.DmChannelWatchInfo) error
setDmChannelInfos(channelInfos ...*querypb.DmChannelWatchInfo) error
getDmChannelNamesByCollectionID(CollectionID UniqueID) []string
getDeltaChannelsByCollectionID(collectionID UniqueID) ([]*datapb.VchannelInfo, error)
......@@ -855,6 +856,14 @@ func (m *MetaReplica) getPartitionStatesByID(collectionID UniqueID, partitionID
return nil, errors.New("getPartitionStateByID: can't find collectionID in collectionInfo")
}
func (m *MetaReplica) getDmChannel(dmChannelName string) (*querypb.DmChannelWatchInfo, bool) {
m.dmChannelMu.RLock()
defer m.dmChannelMu.RUnlock()
dmc, ok := m.dmChannelInfos[dmChannelName]
return dmc, ok
}
func (m *MetaReplica) getDmChannelInfosByNodeID(nodeID int64) []*querypb.DmChannelWatchInfo {
m.dmChannelMu.RLock()
defer m.dmChannelMu.RUnlock()
......@@ -882,16 +891,10 @@ func (m *MetaReplica) getDmChannelNamesByCollectionID(CollectionID UniqueID) []s
return dmChannelNames
}
func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannelWatchInfo) error {
func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos ...*querypb.DmChannelWatchInfo) error {
m.dmChannelMu.Lock()
defer m.dmChannelMu.Unlock()
for _, channelInfo := range dmChannelWatchInfos {
if old, ok := m.dmChannelInfos[channelInfo.DmChannel]; ok {
channelInfo.NodeIds = append(channelInfo.NodeIds, old.NodeIds...)
}
}
err := saveDmChannelWatchInfos(dmChannelWatchInfos, m.getKvClient())
if err != nil {
return err
......
......@@ -235,7 +235,7 @@ func TestMetaFunc(t *testing.T) {
NodeIds: []int64{nodeID},
})
}
err = meta.setDmChannelInfos(dmChannelWatchInfos)
err = meta.setDmChannelInfos(dmChannelWatchInfos...)
assert.Nil(t, err)
})
......
......@@ -387,6 +387,7 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error {
}
return nil
}
func (qc *QueryCoord) getUnallocatedNodes() []int64 {
onlines := qc.cluster.OnlineNodeIDs()
var ret []int64
......
......@@ -2047,11 +2047,10 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro
}
}
tasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, true, lbt.SourceNodeIDs, lbt.DstNodeIDs, replica.GetReplicaID(), lbt.broker)
tasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, false, lbt.SourceNodeIDs, lbt.DstNodeIDs, replica.GetReplicaID(), lbt.broker)
if err != nil {
log.Error("loadBalanceTask: assign child task failed", zap.Int64("sourceNodeID", nodeID))
lbt.setResultInfo(err)
panic(err)
}
internalTasks = append(internalTasks, tasks...)
}
......@@ -2269,11 +2268,15 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
if len(lbt.getChildTask()) > 0 {
replicas := make(map[UniqueID]*milvuspb.ReplicaInfo)
segments := make(map[UniqueID]*querypb.SegmentInfo)
dmChannels := make(map[string]*querypb.DmChannelWatchInfo)
for _, id := range lbt.SourceNodeIDs {
for _, segment := range lbt.meta.getSegmentInfosByNode(id) {
segments[segment.SegmentID] = segment
}
for _, dmChannel := range lbt.meta.getDmChannelInfosByNodeID(id) {
dmChannels[dmChannel.DmChannel] = dmChannel
}
nodeReplicas, err := lbt.meta.getReplicasByNodeID(id)
if err != nil {
......@@ -2324,30 +2327,14 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
return nil
})
}
// if loadBalanceTask execute failed after query node down, the lbt.getResultInfo().ErrorCode will be set to commonpb.ErrorCode_UnexpectedError
// then the queryCoord will panic, and the nodeInfo should not be removed immediately
// after queryCoord recovery, the balanceTask will redo
for _, offlineNodeID := range lbt.SourceNodeIDs {
err := lbt.cluster.RemoveNodeInfo(offlineNodeID)
if err != nil {
log.Error("loadBalanceTask: occur error when removing node info from cluster",
zap.Int64("nodeID", offlineNodeID),
zap.Error(err))
lbt.setResultInfo(err)
return err
}
}
}
// Remove offline nodes from segment
for _, segment := range segments {
segment := segment
wg.Go(func() error {
segment.NodeID = -1
segment.NodeIds = removeFromSlice(segment.NodeIds, lbt.SourceNodeIDs...)
if len(segment.NodeIds) > 0 {
segment.NodeID = segment.NodeIds[0]
}
err := lbt.meta.saveSegmentInfo(segment)
if err != nil {
......@@ -2366,6 +2353,30 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
return nil
})
}
// Remove offline nodes from dmChannels
for _, dmChannel := range dmChannels {
dmChannel := dmChannel
wg.Go(func() error {
dmChannel.NodeIds = removeFromSlice(dmChannel.NodeIds, lbt.SourceNodeIDs...)
err := lbt.meta.setDmChannelInfos(dmChannel)
if err != nil {
log.Error("failed to remove offline nodes from dmChannel info",
zap.String("dmChannel", dmChannel.DmChannel),
zap.Error(err))
return err
}
log.Info("remove offline nodes from dmChannel",
zap.Int64("taskID", lbt.getTaskID()),
zap.String("dmChannel", dmChannel.DmChannel),
zap.Int64s("nodeIds", dmChannel.NodeIds))
return nil
})
}
for _, childTask := range lbt.getChildTask() {
if task, ok := childTask.(*watchDmChannelTask); ok {
wg.Go(func() error {
......@@ -2415,6 +2426,22 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
}
}
// if loadBalanceTask execute failed after query node down, the lbt.getResultInfo().ErrorCode will be set to commonpb.ErrorCode_UnexpectedError
// then the queryCoord will panic, and the nodeInfo should not be removed immediately
// after queryCoord recovery, the balanceTask will redo
if lbt.BalanceReason == querypb.TriggerCondition_NodeDown {
for _, offlineNodeID := range lbt.SourceNodeIDs {
err := lbt.cluster.RemoveNodeInfo(offlineNodeID)
if err != nil {
log.Error("loadBalanceTask: occur error when removing node info from cluster",
zap.Int64("nodeID", offlineNodeID),
zap.Error(err))
lbt.setResultInfo(err)
return err
}
}
}
return nil
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册