diff --git a/internal/datacoord/cluster_store.go b/internal/datacoord/cluster_store.go index 76c9defe7b3cdbaf0119221c9915451c7ae20cbb..6519f203c9bdac6a24f7830db66efbdb755d43fc 100644 --- a/internal/datacoord/cluster_store.go +++ b/internal/datacoord/cluster_store.go @@ -20,6 +20,7 @@ import ( "github.com/milvus-io/milvus/internal/types" ) +// ClusterStore is the interface stores DataNodes information type ClusterStore interface { GetNodes() []*NodeInfo SetNode(nodeID UniqueID, node *NodeInfo) @@ -29,6 +30,8 @@ type ClusterStore interface { SetWatched(nodeID UniqueID, channelsName []string) } +// NodeInfo wrapper struct for storing DataNode information +// and related controlling struct type NodeInfo struct { Info *datapb.DataNodeInfo eventCh chan *NodeEvent @@ -37,13 +40,16 @@ type NodeInfo struct { cancel context.CancelFunc } +// eventChBuffer magic number for channel buffer size in NodeInfo const eventChBuffer = 1024 +// NodeEvent data node event struct type NodeEvent struct { Type NodeEventType Req interface{} } +// NewNodeInfo helper function to create a NodeInfo from provided datapb.DataNodeInfo func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo { ctx, cancel := context.WithCancel(ctx) return &NodeInfo{ @@ -54,6 +60,8 @@ func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo { } } +// ShadowClone shadow clones a NodeInfo +// note that info, eventCh, etc is not created again func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo { cloned := &NodeInfo{ Info: n.Info, @@ -68,6 +76,9 @@ func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo { return cloned } +// Clone "deep" clone a NodeInfo +// note that ONLY `info` field is deep copied +// parameter opts is applied in sequence to clone NodeInfo func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo { info := proto.Clone(n.Info).(*datapb.DataNodeInfo) cloned := &NodeInfo{ @@ -83,14 +94,17 @@ func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo { return cloned } +// GetEventChannel returns event channel func (n *NodeInfo) GetEventChannel() chan *NodeEvent { return n.eventCh } +// GetClient returns client func (n *NodeInfo) GetClient() types.DataNode { return n.client } +// Dispose stops the data node client and calls cancel func (n *NodeInfo) Dispose() { defer n.cancel() if n.client != nil { @@ -98,10 +112,14 @@ func (n *NodeInfo) Dispose() { } } +// NodesInfo wraps a map UniqueID -> NodeInfo +// implements ClusterStore interface +// not lock related field is required so all operations shall be protected outside type NodesInfo struct { nodes map[UniqueID]*NodeInfo } +// NewNodesInfo helper function creates a NodesInfo func NewNodesInfo() *NodesInfo { c := &NodesInfo{ nodes: make(map[UniqueID]*NodeInfo), @@ -109,6 +127,7 @@ func NewNodesInfo() *NodesInfo { return c } +// GetNodes returns nodes list in NodesInfo func (c *NodesInfo) GetNodes() []*NodeInfo { nodes := make([]*NodeInfo, 0, len(c.nodes)) for _, node := range c.nodes { @@ -117,18 +136,22 @@ func (c *NodesInfo) GetNodes() []*NodeInfo { return nodes } +// SetNode sets a NodeInfo with provided UniqueID func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) { c.nodes[nodeID] = node metrics.DataCoordDataNodeList.WithLabelValues("online").Inc() metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec() } +// DeleteNode deletes a NodeInfo with provided UniqueID func (c *NodesInfo) DeleteNode(nodeID UniqueID) { delete(c.nodes, nodeID) metrics.DataCoordDataNodeList.WithLabelValues("online").Dec() metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc() } +// GetNode get NodeInfo binding to the specified UniqueID +// returns nil if no Info is found func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo { node, ok := c.nodes[nodeID] if !ok { @@ -137,20 +160,26 @@ func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo { return node } +// SetClient set DataNode client to specified UniqueID +// do nothing if no Info is found func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) { if node, ok := c.nodes[nodeID]; ok { c.nodes[nodeID] = node.ShadowClone(SetClient(client)) } } +// SetWatched set specified channels watch state from Uncomplete to Complete +// do nothing if no Info is found func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) { if node, ok := c.nodes[nodeID]; ok { c.nodes[nodeID] = node.Clone(SetWatched(channelsName)) } } +// NodeOpt helper functions updating NodeInfo properties type NodeOpt func(n *NodeInfo) +// SetWatched returns a NodeOpt updating specified channels watch states from Uncomplete to Complete func SetWatched(channelsName []string) NodeOpt { return func(n *NodeInfo) { channelsMap := make(map[string]struct{}) @@ -169,18 +198,21 @@ func SetWatched(channelsName []string) NodeOpt { } } +// SetClient returns NodeOpt update DataNode client func SetClient(client types.DataNode) NodeOpt { return func(n *NodeInfo) { n.client = client } } +// AddChannels returns NodeOpt adding specified channels to assigned list func AddChannels(channels []*datapb.ChannelStatus) NodeOpt { return func(n *NodeInfo) { n.Info.Channels = append(n.Info.Channels, channels...) } } +// SetChannels returns NodeOpt updating assigned channels func SetChannels(channels []*datapb.ChannelStatus) NodeOpt { return func(n *NodeInfo) { n.Info.Channels = channels