未验证 提交 e2c15d5b 编写于 作者: C congqixia 提交者: GitHub

[skip ci]Add comment for ClusterStore (#7785)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 ceb5fab6
...@@ -20,6 +20,7 @@ import ( ...@@ -20,6 +20,7 @@ import (
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
) )
// ClusterStore is the interface stores DataNodes information
type ClusterStore interface { type ClusterStore interface {
GetNodes() []*NodeInfo GetNodes() []*NodeInfo
SetNode(nodeID UniqueID, node *NodeInfo) SetNode(nodeID UniqueID, node *NodeInfo)
...@@ -29,6 +30,8 @@ type ClusterStore interface { ...@@ -29,6 +30,8 @@ type ClusterStore interface {
SetWatched(nodeID UniqueID, channelsName []string) SetWatched(nodeID UniqueID, channelsName []string)
} }
// NodeInfo wrapper struct for storing DataNode information
// and related controlling struct
type NodeInfo struct { type NodeInfo struct {
Info *datapb.DataNodeInfo Info *datapb.DataNodeInfo
eventCh chan *NodeEvent eventCh chan *NodeEvent
...@@ -37,13 +40,16 @@ type NodeInfo struct { ...@@ -37,13 +40,16 @@ type NodeInfo struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
// eventChBuffer magic number for channel buffer size in NodeInfo
const eventChBuffer = 1024 const eventChBuffer = 1024
// NodeEvent data node event struct
type NodeEvent struct { type NodeEvent struct {
Type NodeEventType Type NodeEventType
Req interface{} Req interface{}
} }
// NewNodeInfo helper function to create a NodeInfo from provided datapb.DataNodeInfo
func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo { func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
return &NodeInfo{ return &NodeInfo{
...@@ -54,6 +60,8 @@ func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo { ...@@ -54,6 +60,8 @@ func NewNodeInfo(ctx context.Context, info *datapb.DataNodeInfo) *NodeInfo {
} }
} }
// ShadowClone shadow clones a NodeInfo
// note that info, eventCh, etc is not created again
func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo { func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo {
cloned := &NodeInfo{ cloned := &NodeInfo{
Info: n.Info, Info: n.Info,
...@@ -68,6 +76,9 @@ func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo { ...@@ -68,6 +76,9 @@ func (n *NodeInfo) ShadowClone(opts ...NodeOpt) *NodeInfo {
return cloned return cloned
} }
// Clone "deep" clone a NodeInfo
// note that ONLY `info` field is deep copied
// parameter opts is applied in sequence to clone NodeInfo
func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo { func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo {
info := proto.Clone(n.Info).(*datapb.DataNodeInfo) info := proto.Clone(n.Info).(*datapb.DataNodeInfo)
cloned := &NodeInfo{ cloned := &NodeInfo{
...@@ -83,14 +94,17 @@ func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo { ...@@ -83,14 +94,17 @@ func (n *NodeInfo) Clone(opts ...NodeOpt) *NodeInfo {
return cloned return cloned
} }
// GetEventChannel returns event channel
func (n *NodeInfo) GetEventChannel() chan *NodeEvent { func (n *NodeInfo) GetEventChannel() chan *NodeEvent {
return n.eventCh return n.eventCh
} }
// GetClient returns client
func (n *NodeInfo) GetClient() types.DataNode { func (n *NodeInfo) GetClient() types.DataNode {
return n.client return n.client
} }
// Dispose stops the data node client and calls cancel
func (n *NodeInfo) Dispose() { func (n *NodeInfo) Dispose() {
defer n.cancel() defer n.cancel()
if n.client != nil { if n.client != nil {
...@@ -98,10 +112,14 @@ func (n *NodeInfo) Dispose() { ...@@ -98,10 +112,14 @@ func (n *NodeInfo) Dispose() {
} }
} }
// NodesInfo wraps a map UniqueID -> NodeInfo
// implements ClusterStore interface
// not lock related field is required so all operations shall be protected outside
type NodesInfo struct { type NodesInfo struct {
nodes map[UniqueID]*NodeInfo nodes map[UniqueID]*NodeInfo
} }
// NewNodesInfo helper function creates a NodesInfo
func NewNodesInfo() *NodesInfo { func NewNodesInfo() *NodesInfo {
c := &NodesInfo{ c := &NodesInfo{
nodes: make(map[UniqueID]*NodeInfo), nodes: make(map[UniqueID]*NodeInfo),
...@@ -109,6 +127,7 @@ func NewNodesInfo() *NodesInfo { ...@@ -109,6 +127,7 @@ func NewNodesInfo() *NodesInfo {
return c return c
} }
// GetNodes returns nodes list in NodesInfo
func (c *NodesInfo) GetNodes() []*NodeInfo { func (c *NodesInfo) GetNodes() []*NodeInfo {
nodes := make([]*NodeInfo, 0, len(c.nodes)) nodes := make([]*NodeInfo, 0, len(c.nodes))
for _, node := range c.nodes { for _, node := range c.nodes {
...@@ -117,18 +136,22 @@ func (c *NodesInfo) GetNodes() []*NodeInfo { ...@@ -117,18 +136,22 @@ func (c *NodesInfo) GetNodes() []*NodeInfo {
return nodes return nodes
} }
// SetNode sets a NodeInfo with provided UniqueID
func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) { func (c *NodesInfo) SetNode(nodeID UniqueID, node *NodeInfo) {
c.nodes[nodeID] = node c.nodes[nodeID] = node
metrics.DataCoordDataNodeList.WithLabelValues("online").Inc() metrics.DataCoordDataNodeList.WithLabelValues("online").Inc()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec() metrics.DataCoordDataNodeList.WithLabelValues("offline").Dec()
} }
// DeleteNode deletes a NodeInfo with provided UniqueID
func (c *NodesInfo) DeleteNode(nodeID UniqueID) { func (c *NodesInfo) DeleteNode(nodeID UniqueID) {
delete(c.nodes, nodeID) delete(c.nodes, nodeID)
metrics.DataCoordDataNodeList.WithLabelValues("online").Dec() metrics.DataCoordDataNodeList.WithLabelValues("online").Dec()
metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc() metrics.DataCoordDataNodeList.WithLabelValues("offline").Inc()
} }
// GetNode get NodeInfo binding to the specified UniqueID
// returns nil if no Info is found
func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo { func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo {
node, ok := c.nodes[nodeID] node, ok := c.nodes[nodeID]
if !ok { if !ok {
...@@ -137,20 +160,26 @@ func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo { ...@@ -137,20 +160,26 @@ func (c *NodesInfo) GetNode(nodeID UniqueID) *NodeInfo {
return node return node
} }
// SetClient set DataNode client to specified UniqueID
// do nothing if no Info is found
func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) { func (c *NodesInfo) SetClient(nodeID UniqueID, client types.DataNode) {
if node, ok := c.nodes[nodeID]; ok { if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.ShadowClone(SetClient(client)) c.nodes[nodeID] = node.ShadowClone(SetClient(client))
} }
} }
// SetWatched set specified channels watch state from Uncomplete to Complete
// do nothing if no Info is found
func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) { func (c *NodesInfo) SetWatched(nodeID UniqueID, channelsName []string) {
if node, ok := c.nodes[nodeID]; ok { if node, ok := c.nodes[nodeID]; ok {
c.nodes[nodeID] = node.Clone(SetWatched(channelsName)) c.nodes[nodeID] = node.Clone(SetWatched(channelsName))
} }
} }
// NodeOpt helper functions updating NodeInfo properties
type NodeOpt func(n *NodeInfo) type NodeOpt func(n *NodeInfo)
// SetWatched returns a NodeOpt updating specified channels watch states from Uncomplete to Complete
func SetWatched(channelsName []string) NodeOpt { func SetWatched(channelsName []string) NodeOpt {
return func(n *NodeInfo) { return func(n *NodeInfo) {
channelsMap := make(map[string]struct{}) channelsMap := make(map[string]struct{})
...@@ -169,18 +198,21 @@ func SetWatched(channelsName []string) NodeOpt { ...@@ -169,18 +198,21 @@ func SetWatched(channelsName []string) NodeOpt {
} }
} }
// SetClient returns NodeOpt update DataNode client
func SetClient(client types.DataNode) NodeOpt { func SetClient(client types.DataNode) NodeOpt {
return func(n *NodeInfo) { return func(n *NodeInfo) {
n.client = client n.client = client
} }
} }
// AddChannels returns NodeOpt adding specified channels to assigned list
func AddChannels(channels []*datapb.ChannelStatus) NodeOpt { func AddChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) { return func(n *NodeInfo) {
n.Info.Channels = append(n.Info.Channels, channels...) n.Info.Channels = append(n.Info.Channels, channels...)
} }
} }
// SetChannels returns NodeOpt updating assigned channels
func SetChannels(channels []*datapb.ChannelStatus) NodeOpt { func SetChannels(channels []*datapb.ChannelStatus) NodeOpt {
return func(n *NodeInfo) { return func(n *NodeInfo) {
n.Info.Channels = channels n.Info.Channels = channels
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册