From b27e6b52bfec5f0f48cd917fcb1aa453c8229d83 Mon Sep 17 00:00:00 2001 From: congqixia Date: Mon, 28 Jun 2021 13:28:14 +0800 Subject: [PATCH] Force cluster refresh for each dn change event (#6161) Signed-off-by: Congqi Xia --- internal/datacoord/cluster.go | 182 ++++++++++++++++++++++++---------- internal/datacoord/server.go | 27 ++++- 2 files changed, 155 insertions(+), 54 deletions(-) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index cdc20d2ba..dc21b6e61 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/util/retry" "go.uber.org/zap" "golang.org/x/net/context" ) @@ -102,68 +103,130 @@ func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { deltaChange := c.dataManager.updateCluster(dataNodes) nodes, chanBuffer := c.dataManager.getDataNodes(false) var rets []*datapb.DataNodeInfo + var err error rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) c.dataManager.updateDataNodes(rets, chanBuffer) - rets = c.watch(rets) + rets, err = c.watch(rets) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) + //does not trigger new another refresh, pending evt will do + } c.dataManager.updateDataNodes(rets, chanBuffer) return nil } -func (c *cluster) watch(nodes []*datapb.DataNodeInfo) []*datapb.DataNodeInfo { +// refresh rough refresh datanode status after event received +func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error { + deltaChange := c.dataManager.updateCluster(dataNodes) + nodes, chanBuffer := c.dataManager.getDataNodes(false) + var rets []*datapb.DataNodeInfo + var err error + rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) + c.dataManager.updateDataNodes(rets, chanBuffer) + rets, err = c.watch(rets) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) + //does not trigger new another refresh, pending evt will do + } + c.dataManager.updateDataNodes(rets, chanBuffer) // even if some watch failed, status should sync into etcd + return err +} + +// paraRun parallel run, with max Parallel limit +func parraRun(works []func(), maxRunner int) { + wg := sync.WaitGroup{} + ch := make(chan func()) + wg.Add(len(works)) + + for i := 0; i < maxRunner; i++ { + go func() { + work, ok := <-ch + if !ok { + return + } + work() + wg.Done() + }() + } + for _, work := range works { + ch <- work + } + wg.Wait() + close(ch) +} + +func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) { + works := make([]func(), 0, len(nodes)) + mut := sync.Mutex{} + errs := make([]error, 0, len(nodes)) for _, n := range nodes { - logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address) - uncompletes := make([]vchannel, 0, len(n.Channels)) - for _, ch := range n.Channels { - if ch.State == datapb.ChannelWatchState_Uncomplete { - if len(uncompletes) == 0 { - logMsg += ch.Name - } else { - logMsg += "," + ch.Name + works = append(works, func() { + logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address) + uncompletes := make([]vchannel, 0, len(n.Channels)) + for _, ch := range n.Channels { + if ch.State == datapb.ChannelWatchState_Uncomplete { + if len(uncompletes) == 0 { + logMsg += ch.Name + } else { + logMsg += "," + ch.Name + } + uncompletes = append(uncompletes, vchannel{ + CollectionID: ch.CollectionID, + DmlChannel: ch.Name, + }) } - uncompletes = append(uncompletes, vchannel{ - CollectionID: ch.CollectionID, - DmlChannel: ch.Name, - }) } - } - if len(uncompletes) == 0 { - continue - } - log.Debug(logMsg) + if len(uncompletes) == 0 { + return // all set, just return + } + log.Debug(logMsg) - vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true) - if err != nil { - log.Warn("get vchannel position failed", zap.Error(err)) - continue - } - cli, err := c.sessionManager.getOrCreateSession(n.Address) - if err != nil { - log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err)) - continue - } - req := &datapb.WatchDmChannelsRequest{ - Base: &commonpb.MsgBase{ - SourceID: Params.NodeID, - }, - Vchannels: vchanInfos, - } - resp, err := cli.WatchDmChannels(c.ctx, req) - if err != nil { - log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err)) - continue - } - if resp.ErrorCode != commonpb.ErrorCode_Success { - log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err)) - continue - } - for _, ch := range n.Channels { - if ch.State == datapb.ChannelWatchState_Uncomplete { - ch.State = datapb.ChannelWatchState_Complete + vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true) + if err != nil { + log.Warn("get vchannel position failed", zap.Error(err)) + mut.Lock() + errs = append(errs, err) + mut.Unlock() + return } - } + cli, err := c.sessionManager.getOrCreateSession(n.Address) // this might take time if address went offline + if err != nil { + log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err)) + mut.Lock() + errs = append(errs, err) + mut.Unlock() + return + } + req := &datapb.WatchDmChannelsRequest{ + Base: &commonpb.MsgBase{ + SourceID: Params.NodeID, + }, + Vchannels: vchanInfos, + } + resp, err := cli.WatchDmChannels(c.ctx, req) + if err != nil { + log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err)) + mut.Lock() + errs = append(errs, err) + mut.Unlock() + } + if resp.ErrorCode != commonpb.ErrorCode_Success { + log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err)) + mut.Lock() + errs = append(errs, fmt.Errorf("watch fail with stat %v, msg:%s", resp.ErrorCode, resp.Reason)) + mut.Unlock() + return + } + for _, ch := range n.Channels { + if ch.State == datapb.ChannelWatchState_Uncomplete { + ch.State = datapb.ChannelWatchState_Complete + } + } + }) } - return nodes + parraRun(works, 3) + return nodes, retry.ErrorList(errs) } func (c *cluster) register(n *datapb.DataNodeInfo) { @@ -172,11 +235,16 @@ func (c *cluster) register(n *datapb.DataNodeInfo) { c.dataManager.register(n) cNodes, chanBuffer := c.dataManager.getDataNodes(true) var rets []*datapb.DataNodeInfo + var err error log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer) log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) - rets = c.watch(rets) + rets, err = c.watch(rets) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) + //does not trigger new another refresh, pending evt will do + } c.dataManager.updateDataNodes(rets, chanBuffer) } @@ -192,6 +260,7 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) { cNodes, chanBuffer := c.dataManager.getDataNodes(true) log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) var rets []*datapb.DataNodeInfo + var err error if len(cNodes) == 0 { for _, chStat := range n.Channels { chStat.State = datapb.ChannelWatchState_Uncomplete @@ -202,7 +271,11 @@ func (c *cluster) unregister(n *datapb.DataNodeInfo) { } log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) - rets = c.watch(rets) + rets, err = c.watch(rets) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) + //does not trigger new another refresh, pending evt will do + } c.dataManager.updateDataNodes(rets, chanBuffer) } @@ -211,6 +284,7 @@ func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) { defer c.mu.Unlock() cNodes, chanBuffer := c.dataManager.getDataNodes(true) var rets []*datapb.DataNodeInfo + var err error if len(cNodes) == 0 { // no nodes to assign, put into buffer chanBuffer = append(chanBuffer, &datapb.ChannelStatus{ Name: channel, @@ -221,7 +295,11 @@ func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) { rets = c.assignPolicy.apply(cNodes, channel, collectionID) } c.dataManager.updateDataNodes(rets, chanBuffer) - rets = c.watch(rets) + rets, err = c.watch(rets) + if err != nil { + log.Warn("Failed to watch all the status change", zap.Error(err)) + //does not trigger new another refresh, pending evt will do + } c.dataManager.updateDataNodes(rets, chanBuffer) } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index d4bac4e5d..61c3f344d 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -192,6 +192,27 @@ func (s *Server) initServiceDiscovery() error { return nil } +func (s *Server) loadDataNodes() []*datapb.DataNodeInfo { + if s.session == nil { + log.Warn("load data nodes but session is nil") + return []*datapb.DataNodeInfo{} + } + sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole) + if err != nil { + log.Warn("load data nodes faild", zap.Error(err)) + return []*datapb.DataNodeInfo{} + } + datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions)) + for _, session := range sessions { + datanodes = append(datanodes, &datapb.DataNodeInfo{ + Address: session.Address, + Version: session.ServerID, + Channels: []*datapb.ChannelStatus{}, + }) + } + return datanodes +} + func (s *Server) startSegmentManager() { helper := createNewSegmentHelper(s.segmentInfoStream) s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper)) @@ -368,12 +389,14 @@ func (s *Server) startWatchService(ctx context.Context) { log.Info("Received datanode register", zap.String("address", datanode.Address), zap.Int64("serverID", datanode.Version)) - s.cluster.register(datanode) + //s.cluster.register(datanode) + s.cluster.refresh(s.loadDataNodes()) case sessionutil.SessionDelEvent: log.Info("Received datanode unregister", zap.String("address", datanode.Address), zap.Int64("serverID", datanode.Version)) - s.cluster.unregister(datanode) + //s.cluster.unregister(datanode) + s.cluster.refresh(s.loadDataNodes()) default: log.Warn("receive unknown service event type", zap.Any("type", event.EventType)) -- GitLab