diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index ed0227061020d2c1c2cd4ffa10980823c6b3814c..63fefc31e4ca8bf1836de0978432b4b61c7dcb95 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -765,10 +765,6 @@ func (c *ChannelManager) Reassign(nodeID UniqueID, channelName string) error { zap.Int64("nodeID", nodeID), zap.String("channel name", channelName)) updates.Add(nodeID, []*channel{ch}) - } else { - if err := c.remove(nodeID, ch); err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", ch, err.Error()) - } } log.Info("channel manager reassigning channels", @@ -817,10 +813,6 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string) zap.Int64("node ID", nodeID), zap.String("channel name", channelName)) updates.Add(nodeID, []*channel{chToCleanUp}) - } else { - if err := c.remove(nodeID, chToCleanUp); err != nil { - return fmt.Errorf("failed to remove watch info: %v,%s", chToCleanUp, err.Error()) - } } log.Info("channel manager reassigning channels", diff --git a/internal/datacoord/policy.go b/internal/datacoord/policy.go index ae44c5215963ad77b4f73f06aa6ea04883e50dbd..e9e02ebd9371826e0e217ae7757ba00b0fbf05a6 100644 --- a/internal/datacoord/policy.go +++ b/internal/datacoord/policy.go @@ -59,10 +59,10 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { } // Get a list of available node-channel info. - avaNodeChannel := filterNode(store.GetNodesChannels(), nodeID) + avaNodes := filterNode(store.GetNodesChannels(), nodeID) channelNum := 0 - for _, info := range avaNodeChannel { + for _, info := range avaNodes { channelNum += len(info.Channels) } chPerNode := channelNum / (len(store.GetNodes()) + 1) @@ -71,16 +71,16 @@ func AvgAssignRegisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { } // sort in descending order and reallocate - sort.Slice(avaNodeChannel, func(i, j int) bool { - return len(avaNodeChannel[i].Channels) > len(avaNodeChannel[j].Channels) + sort.Slice(avaNodes, func(i, j int) bool { + return len(avaNodes[i].Channels) > len(avaNodes[j].Channels) }) releases := make(map[int64][]*channel) for i := 0; i < chPerNode; i++ { // Pick a node with its channel to release. - toRelease := avaNodeChannel[i%len(avaNodeChannel)] + toRelease := avaNodes[i%len(avaNodes)] // Pick a channel that will be reassigned to the new node later. - chIdx := i / len(avaNodeChannel) + chIdx := i / len(avaNodes) if chIdx >= len(toRelease.Channels) { // Node has too few channels, simply skip. No re-picking. // TODO: Consider re-picking in case assignment is extremely uneven? @@ -167,29 +167,29 @@ type ChannelAssignPolicy func(store ROChannelStore, channels []*channel) Channel // AverageAssignPolicy ensure that the number of channels per nodes is approximately the same func AverageAssignPolicy(store ROChannelStore, channels []*channel) ChannelOpSet { - filteredChannels := filterChannels(store, channels) - if len(filteredChannels) == 0 { + newChannels := filterChannels(store, channels) + if len(newChannels) == 0 { return nil } opSet := ChannelOpSet{} - dataNodesChannels := store.GetNodesChannels() + allDataNodes := store.GetNodesChannels() // If no datanode alive, save channels in buffer - if len(dataNodesChannels) == 0 { + if len(allDataNodes) == 0 { opSet.Add(bufferID, channels) return opSet } // sort and assign - sort.Slice(dataNodesChannels, func(i, j int) bool { - return len(dataNodesChannels[i].Channels) <= len(dataNodesChannels[j].Channels) + sort.Slice(allDataNodes, func(i, j int) bool { + return len(allDataNodes[i].Channels) <= len(allDataNodes[j].Channels) }) updates := make(map[int64][]*channel) - for i, channel := range filteredChannels { - n := dataNodesChannels[i%len(dataNodesChannels)].NodeID - updates[n] = append(updates[n], channel) + for i, newChannel := range newChannels { + n := allDataNodes[i%len(allDataNodes)].NodeID + updates[n] = append(updates[n], newChannel) } for id, chs := range updates { @@ -276,34 +276,34 @@ func EmptyDeregisterPolicy(store ROChannelStore, nodeID int64) ChannelOpSet { // AvgAssignUnregisteredChannels evenly assign the unregistered channels func AvgAssignUnregisteredChannels(store ROChannelStore, nodeID int64) ChannelOpSet { - channels := store.GetNodesChannels() - filteredChannels := make([]*NodeChannelInfo, 0, len(channels)) + allNodes := store.GetNodesChannels() + avaNodes := make([]*NodeChannelInfo, 0, len(allNodes)) unregisteredChannels := make([]*channel, 0) opSet := ChannelOpSet{} - for _, c := range channels { + for _, c := range allNodes { if c.NodeID == nodeID { opSet.Delete(nodeID, c.Channels) unregisteredChannels = append(unregisteredChannels, c.Channels...) continue } - filteredChannels = append(filteredChannels, c) + avaNodes = append(avaNodes, c) } - if len(filteredChannels) == 0 { + if len(avaNodes) == 0 { opSet.Add(bufferID, unregisteredChannels) return opSet } // sort and assign - sort.Slice(filteredChannels, func(i, j int) bool { - return len(filteredChannels[i].Channels) <= len(filteredChannels[j].Channels) + sort.Slice(avaNodes, func(i, j int) bool { + return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels) }) updates := make(map[int64][]*channel) - for i, channel := range unregisteredChannels { - n := filteredChannels[i%len(filteredChannels)].NodeID - updates[n] = append(updates[n], channel) + for i, unregisteredChannel := range unregisteredChannels { + n := avaNodes[i%len(avaNodes)].NodeID + updates[n] = append(updates[n], unregisteredChannel) } for id, chs := range updates { @@ -373,23 +373,26 @@ func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) Cha // AverageReassignPolicy is a reassigning policy that evenly assign channels func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet { - channels := store.GetNodesChannels() + allNodes := store.GetNodesChannels() filterMap := make(map[int64]struct{}) for _, reassign := range reassigns { filterMap[reassign.NodeID] = struct{}{} } - filterChannels := make([]*NodeChannelInfo, 0, len(channels)) - for _, c := range channels { + avaNodes := make([]*NodeChannelInfo, 0, len(allNodes)) + for _, c := range allNodes { if _, ok := filterMap[c.NodeID]; ok { continue } - filterChannels = append(filterChannels, c) + avaNodes = append(avaNodes, c) } - if len(filterChannels) == 0 { + if len(avaNodes) == 0 { // if no node is left, do not reassign return nil } + sort.Slice(avaNodes, func(i, j int) bool { + return len(avaNodes[i].Channels) <= len(avaNodes[j].Channels) + }) // reassign channels to remaining nodes i := 0 @@ -403,7 +406,7 @@ func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) C } ret = append(ret, deleteUpdate) for _, ch := range reassign.Channels { - targetID := filterChannels[i%len(filterChannels)].NodeID + targetID := avaNodes[i%len(avaNodes)].NodeID i++ if _, ok := addUpdates[targetID]; !ok { addUpdates[targetID] = &ChannelOp{