// Copyright (C) 2019-2020 Zilliz. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software distributed under the License // is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express // or implied. See the License for the specific language governing permissions and limitations under the License. package datacoord import ( "fmt" "sync" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/commonpb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/retry" "go.uber.org/zap" "golang.org/x/net/context" ) type cluster struct { mu sync.RWMutex ctx context.Context dataManager *clusterNodeManager sessionManager sessionManager posProvider positionProvider startupPolicy clusterStartupPolicy registerPolicy dataNodeRegisterPolicy unregisterPolicy dataNodeUnregisterPolicy assignPolicy channelAssignPolicy } type clusterOption struct { apply func(c *cluster) } func withStartupPolicy(p clusterStartupPolicy) clusterOption { return clusterOption{ apply: func(c *cluster) { c.startupPolicy = p }, } } func withRegisterPolicy(p dataNodeRegisterPolicy) clusterOption { return clusterOption{ apply: func(c *cluster) { c.registerPolicy = p }, } } func withUnregistorPolicy(p dataNodeUnregisterPolicy) clusterOption { return clusterOption{ apply: func(c *cluster) { c.unregisterPolicy = p }, } } func withAssignPolicy(p channelAssignPolicy) clusterOption { return clusterOption{ apply: func(c *cluster) { c.assignPolicy = p }, } } func defaultStartupPolicy() clusterStartupPolicy { return newWatchRestartsStartupPolicy() } func defaultRegisterPolicy() dataNodeRegisterPolicy { return newAssiggBufferRegisterPolicy() } func defaultUnregisterPolicy() dataNodeUnregisterPolicy { return randomAssignRegisterFunc } func defaultAssignPolicy() channelAssignPolicy { return newBalancedAssignPolicy() } func newCluster(ctx context.Context, dataManager *clusterNodeManager, sessionManager sessionManager, posProvider positionProvider, opts ...clusterOption) *cluster { c := &cluster{ ctx: ctx, sessionManager: sessionManager, dataManager: dataManager, posProvider: posProvider, startupPolicy: defaultStartupPolicy(), registerPolicy: defaultRegisterPolicy(), unregisterPolicy: defaultUnregisterPolicy(), assignPolicy: defaultAssignPolicy(), } for _, opt := range opts { opt.apply(c) } return c } func (c *cluster) startup(dataNodes []*datapb.DataNodeInfo) error { deltaChange := c.dataManager.updateCluster(dataNodes) nodes, chanBuffer := c.dataManager.getDataNodes(false) var rets []*datapb.DataNodeInfo var err error rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) c.dataManager.updateDataNodes(rets, chanBuffer) rets, err = c.watch(rets) if err != nil { log.Warn("Failed to watch all the status change", zap.Error(err)) //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) return nil } // refresh rough refresh datanode status after event received func (c *cluster) refresh(dataNodes []*datapb.DataNodeInfo) error { deltaChange := c.dataManager.updateCluster(dataNodes) nodes, chanBuffer := c.dataManager.getDataNodes(false) var rets []*datapb.DataNodeInfo var err error rets, chanBuffer = c.startupPolicy.apply(nodes, deltaChange, chanBuffer) c.dataManager.updateDataNodes(rets, chanBuffer) rets, err = c.watch(rets) if err != nil { log.Warn("Failed to watch all the status change", zap.Error(err)) //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) // even if some watch failed, status should sync into etcd return err } // paraRun parallel run, with max Parallel limit func parraRun(works []func(), maxRunner int) { wg := sync.WaitGroup{} ch := make(chan func()) wg.Add(len(works)) for i := 0; i < maxRunner; i++ { go func() { work, ok := <-ch if !ok { return } work() wg.Done() }() } for _, work := range works { ch <- work } wg.Wait() close(ch) } func (c *cluster) watch(nodes []*datapb.DataNodeInfo) ([]*datapb.DataNodeInfo, error) { works := make([]func(), 0, len(nodes)) mut := sync.Mutex{} errs := make([]error, 0, len(nodes)) for _, n := range nodes { works = append(works, func() { logMsg := fmt.Sprintf("Begin to watch channels for node %s:", n.Address) uncompletes := make([]vchannel, 0, len(n.Channels)) for _, ch := range n.Channels { if ch.State == datapb.ChannelWatchState_Uncomplete { if len(uncompletes) == 0 { logMsg += ch.Name } else { logMsg += "," + ch.Name } uncompletes = append(uncompletes, vchannel{ CollectionID: ch.CollectionID, DmlChannel: ch.Name, }) } } if len(uncompletes) == 0 { return // all set, just return } log.Debug(logMsg) vchanInfos, err := c.posProvider.GetVChanPositions(uncompletes, true) if err != nil { log.Warn("get vchannel position failed", zap.Error(err)) mut.Lock() errs = append(errs, err) mut.Unlock() return } cli, err := c.sessionManager.getOrCreateSession(n.Address) // this might take time if address went offline if err != nil { log.Warn("get session failed", zap.String("addr", n.Address), zap.Error(err)) mut.Lock() errs = append(errs, err) mut.Unlock() return } req := &datapb.WatchDmChannelsRequest{ Base: &commonpb.MsgBase{ SourceID: Params.NodeID, }, Vchannels: vchanInfos, } resp, err := cli.WatchDmChannels(c.ctx, req) if err != nil { log.Warn("watch dm channel failed", zap.String("addr", n.Address), zap.Error(err)) mut.Lock() errs = append(errs, err) mut.Unlock() } if resp.ErrorCode != commonpb.ErrorCode_Success { log.Warn("watch channels failed", zap.String("address", n.Address), zap.Error(err)) mut.Lock() errs = append(errs, fmt.Errorf("watch fail with stat %v, msg:%s", resp.ErrorCode, resp.Reason)) mut.Unlock() return } for _, ch := range n.Channels { if ch.State == datapb.ChannelWatchState_Uncomplete { ch.State = datapb.ChannelWatchState_Complete } } }) } parraRun(works, 3) return nodes, retry.ErrorList(errs) } func (c *cluster) register(n *datapb.DataNodeInfo) { c.mu.Lock() defer c.mu.Unlock() c.dataManager.register(n) cNodes, chanBuffer := c.dataManager.getDataNodes(true) var rets []*datapb.DataNodeInfo var err error log.Debug("before register policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) rets, chanBuffer = c.registerPolicy.apply(cNodes, n, chanBuffer) log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) rets, err = c.watch(rets) if err != nil { log.Warn("Failed to watch all the status change", zap.Error(err)) //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) unregister(n *datapb.DataNodeInfo) { c.mu.Lock() defer c.mu.Unlock() c.sessionManager.releaseSession(n.Address) oldNode := c.dataManager.unregister(n) if oldNode != nil { n = oldNode } cNodes, chanBuffer := c.dataManager.getDataNodes(true) log.Debug("before unregister policy applied", zap.Any("n.Channels", n.Channels), zap.Any("buffer", chanBuffer)) var rets []*datapb.DataNodeInfo var err error if len(cNodes) == 0 { for _, chStat := range n.Channels { chStat.State = datapb.ChannelWatchState_Uncomplete chanBuffer = append(chanBuffer, chStat) } } else { rets = c.unregisterPolicy.apply(cNodes, n) } log.Debug("after register policy applied", zap.Any("ret", rets), zap.Any("buffer", chanBuffer)) c.dataManager.updateDataNodes(rets, chanBuffer) rets, err = c.watch(rets) if err != nil { log.Warn("Failed to watch all the status change", zap.Error(err)) //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) watchIfNeeded(channel string, collectionID UniqueID) { c.mu.Lock() defer c.mu.Unlock() cNodes, chanBuffer := c.dataManager.getDataNodes(true) var rets []*datapb.DataNodeInfo var err error if len(cNodes) == 0 { // no nodes to assign, put into buffer chanBuffer = append(chanBuffer, &datapb.ChannelStatus{ Name: channel, CollectionID: collectionID, State: datapb.ChannelWatchState_Uncomplete, }) } else { rets = c.assignPolicy.apply(cNodes, channel, collectionID) } c.dataManager.updateDataNodes(rets, chanBuffer) rets, err = c.watch(rets) if err != nil { log.Warn("Failed to watch all the status change", zap.Error(err)) //does not trigger new another refresh, pending evt will do } c.dataManager.updateDataNodes(rets, chanBuffer) } func (c *cluster) flush(segments []*datapb.SegmentInfo) { c.mu.Lock() defer c.mu.Unlock() m := make(map[string]map[UniqueID][]UniqueID) // channel-> map[collectionID]segmentIDs for _, seg := range segments { if _, ok := m[seg.InsertChannel]; !ok { m[seg.InsertChannel] = make(map[UniqueID][]UniqueID) } m[seg.InsertChannel][seg.CollectionID] = append(m[seg.InsertChannel][seg.CollectionID], seg.ID) } dataNodes, _ := c.dataManager.getDataNodes(true) channel2Node := make(map[string]string) for _, node := range dataNodes { for _, chstatus := range node.Channels { channel2Node[chstatus.Name] = node.Address } } for ch, coll2seg := range m { node, ok := channel2Node[ch] if !ok { continue } cli, err := c.sessionManager.getOrCreateSession(node) if err != nil { log.Warn("get session failed", zap.String("addr", node), zap.Error(err)) continue } for coll, segs := range coll2seg { req := &datapb.FlushSegmentsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_Flush, SourceID: Params.NodeID, }, CollectionID: coll, SegmentIDs: segs, } resp, err := cli.FlushSegments(c.ctx, req) if err != nil { log.Warn("flush segment failed", zap.String("addr", node), zap.Error(err)) continue } if resp.ErrorCode != commonpb.ErrorCode_Success { log.Warn("flush segment failed", zap.String("dataNode", node), zap.Error(err)) continue } log.Debug("flush segments succeed", zap.Any("segmentIDs", segs)) } } } func (c *cluster) releaseSessions() { c.mu.Lock() defer c.mu.Unlock() c.sessionManager.release() }