未验证 提交 3eb074a9 编写于 作者: N Nemo 提交者: GitHub

Update some comments for data coord with some minor code refactoring. (#15686)

Signed-off-by: NYuchen Gao <yuchen.gao@zilliz.com>
上级 ff875b03
......@@ -35,7 +35,7 @@ const (
maxWatchDuration = 20 * time.Second
)
// ChannelManager manages the allocation and the balance of channels between datanodes
// ChannelManager manages the allocation and the balance between channels and data nodes.
type ChannelManager struct {
mu sync.RWMutex
h Handler
......@@ -54,7 +54,7 @@ type channel struct {
CollectionID UniqueID
}
// ChannelManagerOpt is to set optional parameters in channel manager
// ChannelManagerOpt is to set optional parameters in channel manager.
type ChannelManagerOpt func(c *ChannelManager)
func withFactory(f ChannelPolicyFactory) ChannelManagerOpt {
......@@ -69,7 +69,7 @@ func withMsgstreamFactory(f msgstream.Factory) ChannelManagerOpt {
return func(c *ChannelManager) { c.msgstreamFactory = f }
}
// NewChannelManager returns a new ChannelManager
// NewChannelManager creates and returns a new ChannelManager instance.
func NewChannelManager(
kv kv.TxnKV,
h Handler,
......@@ -97,38 +97,43 @@ func NewChannelManager(
return c, nil
}
// Startup adjusts the channel store according to current cluster states
// Startup adjusts the channel store according to current cluster states.
func (c *ChannelManager) Startup(nodes []int64) error {
channels := c.store.GetNodesChannels()
olds := make([]int64, 0, len(channels))
// Retrieve the current old nodes.
oNodes := make([]int64, 0, len(channels))
for _, c := range channels {
olds = append(olds, c.NodeID)
oNodes = append(oNodes, c.NodeID)
}
newOnLines := c.getNewOnLines(nodes, olds)
// Add new online nodes to the cluster.
newOnLines := c.getNewOnLines(nodes, oNodes)
for _, n := range newOnLines {
if err := c.AddNode(n); err != nil {
return err
}
}
offlines := c.getOffLines(nodes, olds)
for _, n := range offlines {
// Remove new offline nodes from the cluster.
offLines := c.getOffLines(nodes, oNodes)
for _, n := range offLines {
if err := c.DeleteNode(n); err != nil {
return err
}
}
// Unwatch and drop channel with drop flag.
c.unwatchDroppedChannels()
log.Debug("cluster start up",
zap.Any("nodes", nodes),
zap.Any("olds", olds),
zap.Any("oNodes", oNodes),
zap.Int64s("new onlines", newOnLines),
zap.Int64s("offLines", offlines))
zap.Int64s("offLines", offLines))
return nil
}
// unwatchDroppedChannels removes drops channel that are marked to drop.
func (c *ChannelManager) unwatchDroppedChannels() {
nodeChannels := c.store.GetNodesChannels()
for _, nodeChannel := range nodeChannels {
......@@ -146,6 +151,7 @@ func (c *ChannelManager) unwatchDroppedChannels() {
}
}
// NOT USED.
func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
timer := time.NewTicker(bgCheckInterval)
for {
......@@ -168,7 +174,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
log.Debug("channel manager bg check reassign", zap.Array("updates", updates))
for _, update := range updates {
if update.Type == Add {
c.fillChannelPosition(update)
c.fillChannelWatchInfo(update)
}
}
......@@ -181,6 +187,7 @@ func (c *ChannelManager) bgCheckChannelsWork(ctx context.Context) {
}
}
// getNewOnLines returns a list of new online node ids in `curr` but not in `old`.
func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 {
mold := make(map[int64]struct{})
ret := make([]int64, 0, len(curr))
......@@ -195,6 +202,7 @@ func (c *ChannelManager) getNewOnLines(curr []int64, old []int64) []int64 {
return ret
}
// getOffLines returns a list of new offline node ids in `old` but not in `curr`.
func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 {
mcurr := make(map[int64]struct{})
ret := make([]int64, 0, len(old))
......@@ -209,7 +217,7 @@ func (c *ChannelManager) getOffLines(curr []int64, old []int64) []int64 {
return ret
}
// AddNode adds a new node in cluster
// AddNode adds a new node to cluster and reassigns the node - channel mapping.
func (c *ChannelManager) AddNode(nodeID int64) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -221,15 +229,15 @@ func (c *ChannelManager) AddNode(nodeID int64) error {
zap.Int64("registered node", nodeID),
zap.Array("updates", updates))
for _, v := range updates {
if v.Type == Add {
c.fillChannelPosition(v)
for _, op := range updates {
if op.Type == Add {
c.fillChannelWatchInfo(op)
}
}
return c.store.Update(updates)
}
// DeleteNode deletes the node from the cluster
// DeleteNode deletes the node from the cluster.
func (c *ChannelManager) DeleteNode(nodeID int64) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -239,7 +247,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
return nil
}
c.tryToUnsubscribe(nodeChannelInfo)
c.unsubAttempt(nodeChannelInfo)
updates := c.deregisterPolicy(c.store, nodeID)
log.Debug("deregister node",
......@@ -248,7 +256,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
for _, v := range updates {
if v.Type == Add {
c.fillChannelPosition(v)
c.fillChannelWatchInfo(v)
}
}
if err := c.store.Update(updates); err != nil {
......@@ -258,8 +266,9 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
return err
}
func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) {
if nodeChannelInfo == nil {
// unsubAttempt attempts to unsubscribe node-channel info from the channel.
func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
if ncInfo == nil {
return
}
......@@ -268,32 +277,33 @@ func (c *ChannelManager) tryToUnsubscribe(nodeChannelInfo *NodeChannelInfo) {
return
}
nodeID := nodeChannelInfo.NodeID
for _, ch := range nodeChannelInfo.Channels {
subscriptionName := subscriptionGenerator(ch.CollectionID, nodeID)
err := c.unsubscribe(subscriptionName, ch.Name)
nodeID := ncInfo.NodeID
for _, ch := range ncInfo.Channels {
subName := buildSubName(ch.CollectionID, nodeID)
err := c.unsubscribe(subName, ch.Name)
if err != nil {
log.Warn("failed to unsubcribe topic", zap.String("subscription name", subscriptionName), zap.String("channel name", ch.Name))
log.Warn("failed to unsubscribe topic", zap.String("subscription name", subName), zap.String("channel name", ch.Name))
}
}
}
func subscriptionGenerator(collectionID int64, nodeID int64) string {
// buildSubName generates a subscription name by concatenating DataNodeSubName, node ID and collection ID.
func buildSubName(collectionID int64, nodeID int64) string {
return fmt.Sprintf("%s-%d-%d", Params.MsgChannelCfg.DataNodeSubName, nodeID, collectionID)
}
func (c *ChannelManager) unsubscribe(subscriptionName string, channel string) error {
func (c *ChannelManager) unsubscribe(subName string, channel string) error {
msgStream, err := c.msgstreamFactory.NewMsgStream(context.TODO())
if err != nil {
return err
}
msgStream.AsConsumer([]string{channel}, subscriptionName)
msgStream.AsConsumer([]string{channel}, subName)
msgStream.Close()
return nil
}
// Watch try to add the channel to cluster. If the channel already exists, do nothing
// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
func (c *ChannelManager) Watch(ch *channel) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -308,7 +318,7 @@ func (c *ChannelManager) Watch(ch *channel) error {
for _, v := range updates {
if v.Type == Add {
c.fillChannelPosition(v)
c.fillChannelWatchInfo(v)
}
}
err := c.store.Update(updates)
......@@ -322,19 +332,20 @@ func (c *ChannelManager) Watch(ch *channel) error {
return nil
}
func (c *ChannelManager) fillChannelPosition(update *ChannelOp) {
for _, ch := range update.Channels {
vchan := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
// fillChannelWatchInfo updates the channel op by filling in channel watch info.
func (c *ChannelManager) fillChannelWatchInfo(op *ChannelOp) {
for _, ch := range op.Channels {
vcInfo := c.h.GetVChanPositions(ch.Name, ch.CollectionID, allPartitionID)
info := &datapb.ChannelWatchInfo{
Vchan: vchan,
Vchan: vcInfo,
StartTs: time.Now().Unix(),
State: datapb.ChannelWatchState_Uncomplete,
}
update.ChannelWatchInfos = append(update.ChannelWatchInfos, info)
op.ChannelWatchInfos = append(op.ChannelWatchInfos, info)
}
}
// GetChannels gets channels info of registered nodes
// GetChannels gets channels info of registered nodes.
func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
c.mu.RLock()
defer c.mu.RUnlock()
......@@ -342,15 +353,15 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo {
return c.store.GetNodesChannels()
}
// GetBuffer gets buffer channels
func (c *ChannelManager) GetBuffer() *NodeChannelInfo {
// GetBufferChannels gets buffer channels.
func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo {
c.mu.RLock()
defer c.mu.RUnlock()
return c.store.GetBufferChannelInfo()
}
// Match checks whether nodeID and channel match
// Match checks and returns whether the node ID and channel match.
func (c *ChannelManager) Match(nodeID int64, channel string) bool {
c.mu.RLock()
defer c.mu.RUnlock()
......@@ -368,7 +379,7 @@ func (c *ChannelManager) Match(nodeID int64, channel string) bool {
return false
}
// FindWatcher finds the datanode watching the provided channel
// FindWatcher finds the datanode watching the provided channel.
func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
c.mu.RLock()
defer c.mu.RUnlock()
......@@ -392,7 +403,7 @@ func (c *ChannelManager) FindWatcher(channel string) (int64, error) {
return 0, errChannelNotWatched
}
// RemoveChannel removes the channel from channel manager
// RemoveChannel removes the channel from channel manager.
func (c *ChannelManager) RemoveChannel(channelName string) error {
c.mu.Lock()
defer c.mu.Unlock()
......@@ -405,6 +416,7 @@ func (c *ChannelManager) RemoveChannel(channelName string) error {
return c.remove(nodeID, ch)
}
// remove deletes the nodeID-channel pair from data store.
func (c *ChannelManager) remove(nodeID int64, ch *channel) error {
var op ChannelOpSet
op.Delete(nodeID, []*channel{ch})
......
......@@ -21,17 +21,17 @@ import (
"stathat.com/c/consistent"
)
// ChannelPolicyFactory is the abstract factory to create policies for channel manager
// ChannelPolicyFactory is the abstract factory that creates policies for channel manager.
type ChannelPolicyFactory interface {
// NewRegisterPolicy create a new register policy
// NewRegisterPolicy creates a new register policy.
NewRegisterPolicy() RegisterPolicy
// NewDeregisterPolicy create a new dereigster policy
// NewDeregisterPolicy creates a new deregister policy.
NewDeregisterPolicy() DeregisterPolicy
// NewAssignPolicy create a new channel assign policy
// NewAssignPolicy creates a new channel assign policy.
NewAssignPolicy() ChannelAssignPolicy
// NewReassignPolicy create a new channel reassign policy
// NewReassignPolicy creates a new channel reassign policy.
NewReassignPolicy() ChannelReassignPolicy
// NewBgChecker create a new bakcground checker
// NewBgChecker creates a new background checker.
NewBgChecker() ChannelBGChecker
}
......@@ -40,27 +40,27 @@ type ChannelPolicyFactoryV1 struct {
kv kv.TxnKV
}
// NewChannelPolicyFactoryV1 helper function creates a Channel policy factory v1 from kv
// NewChannelPolicyFactoryV1 helper function creates a Channel policy factory v1 from kv.
func NewChannelPolicyFactoryV1(kv kv.TxnKV) *ChannelPolicyFactoryV1 {
return &ChannelPolicyFactoryV1{kv: kv}
}
// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy
// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewRegisterPolicy() RegisterPolicy {
return AvgAssignRegisterPolicy
}
// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels
// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels.
func (f *ChannelPolicyFactoryV1) NewDeregisterPolicy() DeregisterPolicy {
return AvgAssignUnregisteredChannels
}
// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy
// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewAssignPolicy() ChannelAssignPolicy {
return AverageAssignPolicy
}
// NewReassignPolicy implementing ChannelPolicyFactory returns AvarageReassginPolicy
// NewReassignPolicy implementing ChannelPolicyFactory returns AverageReassignPolicy.
func (f *ChannelPolicyFactoryV1) NewReassignPolicy() ChannelReassignPolicy {
return AverageReassignPolicy
}
......
......@@ -31,23 +31,20 @@ import (
const (
bufferID = math.MinInt64
delimeter = "/"
delimiter = "/"
maxOperationsPerTxn = 128
)
var errUnknownOpType = errors.New("unknown operation type")
// ChannelOpType type alias uses int8 stands for Channel operation type
type ChannelOpType int8
const (
// Add const value for Add Channel operation type
Add ChannelOpType = iota
// Delete const value for Delete Channel operation type
Delete
)
//ChannelOp is the operation to update the channel store
// ChannelOp is an individual ADD or DELETE operation to the channel store.
type ChannelOp struct {
Type ChannelOpType
NodeID int64
......@@ -55,10 +52,10 @@ type ChannelOp struct {
ChannelWatchInfos []*datapb.ChannelWatchInfo
}
// ChannelOpSet contains some channel update operations
// ChannelOpSet is a set of channel operations.
type ChannelOpSet []*ChannelOp
// Add adds the operation which maps channels to node
// Add appends a single operation to add the mapping between a node and channels.
func (cos *ChannelOpSet) Add(id int64, channels []*channel) {
*cos = append(*cos, &ChannelOp{
NodeID: id,
......@@ -67,7 +64,7 @@ func (cos *ChannelOpSet) Add(id int64, channels []*channel) {
})
}
// Delete removes the mapping between channels and node
// Delete appends a single operation to remove the mapping between a node and channels.
func (cos *ChannelOpSet) Delete(id int64, channels []*channel) {
*cos = append(*cos, &ChannelOp{
NodeID: id,
......@@ -76,48 +73,49 @@ func (cos *ChannelOpSet) Delete(id int64, channels []*channel) {
})
}
// ROChannelStore is the read only channel store from which user can read the mapping between channels and node
// ROChannelStore is a read only channel store for channels and nodes.
type ROChannelStore interface {
// GetNode gets the channel info of node
// GetNode returns the channel info of a specific node.
GetNode(nodeID int64) *NodeChannelInfo
// GetChannels gets all channel infos
// GetChannels returns info of all channels.
GetChannels() []*NodeChannelInfo
// GetNodesChannels gets the channels assigned to real nodes
// GetNodesChannels returns the channels that are assigned to nodes.
GetNodesChannels() []*NodeChannelInfo
// GetBufferChannelInfo gets the unassigned channels
// GetBufferChannelInfo gets the unassigned channels.
GetBufferChannelInfo() *NodeChannelInfo
// GetNodes gets all nodes id in store
// GetNodes gets all node ids in store.
GetNodes() []int64
}
// RWChannelStore is the read write channel store which maintains the mapping between channels and node
// RWChannelStore is the read write channel store for channels and nodes.
type RWChannelStore interface {
ROChannelStore
// Reload restores the buffer channels and node-channels mapping form kv
// Reload restores the buffer channels and node-channels mapping form kv.
Reload() error
// Add creates a new node-channels mapping, but no channels are assigned to this node
// Add creates a new node-channels mapping, with no channels assigned to the node.
Add(nodeID int64)
// Delete removes nodeID and returns the channels
// Delete removes nodeID and returns its channels.
Delete(nodeID int64) ([]*channel, error)
// Update applies the operations in ChannelOpSet
// Update applies the operations in ChannelOpSet.
Update(op ChannelOpSet) error
}
// ChannelStore must satisfy RWChannelStore.
var _ RWChannelStore = (*ChannelStore)(nil)
// ChannelStore maintains the mapping relationship between channel and datanode
// ChannelStore maintains a mapping between channels and data nodes.
type ChannelStore struct {
store kv.TxnKV
channelsInfo map[int64]*NodeChannelInfo
store kv.TxnKV // A kv store with (NodeChannelKey) -> (ChannelWatchInfos) information.
channelsInfo map[int64]*NodeChannelInfo // A map of (nodeID) -> (NodeChannelInfo).
}
// NodeChannelInfo is the mapping between channels and node
// NodeChannelInfo stores the nodeID and its channels.
type NodeChannelInfo struct {
NodeID int64
Channels []*channel
}
// NewChannelStore creates a new ChannelStore
// NewChannelStore creates and returns a new ChannelStore.
func NewChannelStore(kv kv.TxnKV) *ChannelStore {
c := &ChannelStore{
store: kv,
......@@ -130,7 +128,7 @@ func NewChannelStore(kv kv.TxnKV) *ChannelStore {
return c
}
// Reload restores the buffer channels and node-channels mapping from kv
// Reload restores the buffer channels and node-channels mapping from kv.
func (c *ChannelStore) Reload() error {
keys, values, err := c.store.LoadWithPrefix(Params.DataCoordCfg.ChannelWatchSubPath)
if err != nil {
......@@ -139,27 +137,28 @@ func (c *ChannelStore) Reload() error {
for i := 0; i < len(keys); i++ {
k := keys[i]
v := values[i]
nodeID, err := parseNodeID(k)
nodeID, err := parseNodeKey(k)
if err != nil {
return err
}
temp := &datapb.ChannelWatchInfo{}
if err := proto.Unmarshal([]byte(v), temp); err != nil {
cw := &datapb.ChannelWatchInfo{}
if err := proto.Unmarshal([]byte(v), cw); err != nil {
return err
}
c.Add(nodeID)
channel := &channel{
Name: temp.GetVchan().GetChannelName(),
CollectionID: temp.GetVchan().GetCollectionID(),
Name: cw.GetVchan().GetChannelName(),
CollectionID: cw.GetVchan().GetCollectionID(),
}
c.channelsInfo[nodeID].Channels = append(c.channelsInfo[nodeID].Channels, channel)
}
return nil
}
// Add creates a new node-channels mapping, but no channels are assigned to this node
// Add creates a new node-channels mapping for the given node, and assigns no channels to it.
// Returns immediately if the node's already in the channel.
func (c *ChannelStore) Add(nodeID int64) {
if _, ok := c.channelsInfo[nodeID]; ok {
return
......@@ -171,7 +170,7 @@ func (c *ChannelStore) Add(nodeID int64) {
}
}
// Update applies the operations in opSet
// Update applies the channel operations in opSet.
func (c *ChannelStore) Update(opSet ChannelOpSet) error {
totalChannelNum := 0
for _, op := range opSet {
......@@ -180,8 +179,8 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error {
if totalChannelNum <= maxOperationsPerTxn {
return c.update(opSet)
}
// split opset to many txn; same channel's operations should be executed in one txn.
channelsOpSet := make(map[string]ChannelOpSet)
// Split opset into multiple txn. Operations on the same channel must be executed in one txn.
perChOps := make(map[string]ChannelOpSet)
for _, op := range opSet {
for i, ch := range op.Channels {
chOp := &ChannelOp{
......@@ -192,14 +191,14 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error {
if op.Type == Add {
chOp.ChannelWatchInfos = []*datapb.ChannelWatchInfo{op.ChannelWatchInfos[i]}
}
channelsOpSet[ch.Name] = append(channelsOpSet[ch.Name], chOp)
perChOps[ch.Name] = append(perChOps[ch.Name], chOp)
}
}
// execute a txn per 128 operations
// Execute a txn for every 128 operations.
count := 0
operations := make([]*ChannelOp, 0, maxOperationsPerTxn)
for _, opset := range channelsOpSet {
for _, opset := range perChOps {
if count+len(opset) > maxOperationsPerTxn {
if err := c.update(operations); err != nil {
return err
......@@ -216,28 +215,33 @@ func (c *ChannelStore) Update(opSet ChannelOpSet) error {
return c.update(operations)
}
// update applies the ADD/DELETE operations to the current channel store.
func (c *ChannelStore) update(opSet ChannelOpSet) error {
// Update ChannelStore's kv store.
if err := c.txn(opSet); err != nil {
return err
}
for _, v := range opSet {
switch v.Type {
// Update node id -> channel mapping.
for _, op := range opSet {
switch op.Type {
case Add:
c.channelsInfo[v.NodeID].Channels = append(c.channelsInfo[v.NodeID].Channels, v.Channels...)
// Append target channels to channel store.
c.channelsInfo[op.NodeID].Channels = append(c.channelsInfo[op.NodeID].Channels, op.Channels...)
case Delete:
filter := make(map[string]struct{})
for _, ch := range v.Channels {
filter[ch.Name] = struct{}{}
// Remove target channels from channel store.
del := make(map[string]struct{})
for _, ch := range op.Channels {
del[ch.Name] = struct{}{}
}
origin := c.channelsInfo[v.NodeID].Channels
res := make([]*channel, 0, len(origin))
for _, ch := range origin {
if _, ok := filter[ch.Name]; !ok {
res = append(res, ch)
prev := c.channelsInfo[op.NodeID].Channels
curr := make([]*channel, 0, len(prev))
for _, ch := range prev {
if _, ok := del[ch.Name]; !ok {
curr = append(curr, ch)
}
}
c.channelsInfo[v.NodeID].Channels = res
c.channelsInfo[op.NodeID].Channels = curr
default:
return errUnknownOpType
}
......@@ -245,7 +249,7 @@ func (c *ChannelStore) update(opSet ChannelOpSet) error {
return nil
}
// GetChannels gets all channel infos
// GetChannels returns information of all channels.
func (c *ChannelStore) GetChannels() []*NodeChannelInfo {
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
for _, info := range c.channelsInfo {
......@@ -254,19 +258,18 @@ func (c *ChannelStore) GetChannels() []*NodeChannelInfo {
return ret
}
// GetNodesChannels gets the channels assigned to real nodes
// GetNodesChannels returns the channels assigned to real nodes.
func (c *ChannelStore) GetNodesChannels() []*NodeChannelInfo {
ret := make([]*NodeChannelInfo, 0, len(c.channelsInfo))
for id, info := range c.channelsInfo {
if id == bufferID {
continue
if id != bufferID {
ret = append(ret, info)
}
ret = append(ret, info)
}
return ret
}
// GetBufferChannelInfo gets the unassigned channels
// GetBufferChannelInfo returns all unassigned channels.
func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
for id, info := range c.channelsInfo {
if id == bufferID {
......@@ -276,7 +279,7 @@ func (c *ChannelStore) GetBufferChannelInfo() *NodeChannelInfo {
return nil
}
// GetNode gets the channel info of node
// GetNode returns the channel info of a given node.
func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
for id, info := range c.channelsInfo {
if id == nodeID {
......@@ -286,7 +289,7 @@ func (c *ChannelStore) GetNode(nodeID int64) *NodeChannelInfo {
return nil
}
// Delete remove the nodeID and returns its channels
// Delete removes the given node from the channel store and returns its channels.
func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
for id, info := range c.channelsInfo {
if id == nodeID {
......@@ -300,36 +303,37 @@ func (c *ChannelStore) Delete(nodeID int64) ([]*channel, error) {
return nil, nil
}
// GetNodes gets all nodes id in store
// GetNodes returns a slice of all nodes ids in the current channel store.
func (c *ChannelStore) GetNodes() []int64 {
ids := make([]int64, 0, len(c.channelsInfo))
for id := range c.channelsInfo {
if id == bufferID {
continue
if id != bufferID {
ids = append(ids, id)
}
ids = append(ids, id)
}
return ids
}
// remove deletes kv pairs from the kv store where keys have given nodeID as prefix.
func (c *ChannelStore) remove(nodeID int64) error {
k := buildNodeKey(nodeID)
k := buildKeyPrefix(nodeID)
return c.store.RemoveWithPrefix(k)
}
// txn updates the channelStore's kv store with the given channel ops.
func (c *ChannelStore) txn(opSet ChannelOpSet) error {
saves := make(map[string]string)
var removals []string
for _, update := range opSet {
for i, c := range update.Channels {
k := buildChannelKey(update.NodeID, c.Name)
switch update.Type {
for _, op := range opSet {
for i, ch := range op.Channels {
k := buildNodeChannelKey(op.NodeID, ch.Name)
switch op.Type {
case Add:
val, err := proto.Marshal(update.ChannelWatchInfos[i])
info, err := proto.Marshal(op.ChannelWatchInfos[i])
if err != nil {
return err
}
saves[k] = string(val)
saves[k] = string(info)
case Delete:
removals = append(removals, k)
default:
......@@ -340,26 +344,30 @@ func (c *ChannelStore) txn(opSet ChannelOpSet) error {
return c.store.MultiSaveAndRemove(saves, removals)
}
func buildChannelKey(nodeID int64, channel string) string {
return fmt.Sprintf("%s%s%d%s%s", Params.DataCoordCfg.ChannelWatchSubPath, delimeter, nodeID, delimeter, channel)
// buildNodeChannelKey generates a key for kv store, where the key is a concatenation of ChannelWatchSubPath, nodeID and channel name.
func buildNodeChannelKey(nodeID int64, chName string) string {
return fmt.Sprintf("%s%s%d%s%s", Params.DataCoordCfg.ChannelWatchSubPath, delimiter, nodeID, delimiter, chName)
}
func buildNodeKey(nodeID int64) string {
return fmt.Sprintf("%s%s%d", Params.DataCoordCfg.ChannelWatchSubPath, delimeter, nodeID)
// buildKeyPrefix generates a key *prefix* for kv store, where the key prefix is a concatenation of ChannelWatchSubPath and nodeID.
func buildKeyPrefix(nodeID int64) string {
return fmt.Sprintf("%s%s%d", Params.DataCoordCfg.ChannelWatchSubPath, delimiter, nodeID)
}
func parseNodeID(key string) (int64, error) {
s := strings.Split(key, delimeter)
// parseNodeKey validates a given node key, then extracts and returns the corresponding node id on success.
func parseNodeKey(key string) (int64, error) {
s := strings.Split(key, delimiter)
if len(s) < 2 {
return -1, fmt.Errorf("wrong channel key in etcd %s", key)
return -1, fmt.Errorf("wrong node key in etcd %s", key)
}
return strconv.ParseInt(s[len(s)-2], 10, 64)
}
// ChannelOpTypeNames implements zap log marshaler for ChannelOpSet
// ChannelOpTypeNames implements zap log marshaller for ChannelOpSet.
var ChannelOpTypeNames = []string{"Add", "Delete"}
// MarshalLogObject implements the interface ObjectMarshaler
// TODO: NIT: ObjectMarshaler -> ObjectMarshaller
// MarshalLogObject implements the interface ObjectMarshaler.
func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("type", ChannelOpTypeNames[cu.Type])
enc.AddInt64("nodeID", cu.NodeID)
......@@ -376,7 +384,8 @@ func (cu *ChannelOp) MarshalLogObject(enc zapcore.ObjectEncoder) error {
return nil
}
// MarshalLogArray implements the interface of ArrayMarshaler of zap
// TODO: NIT: ArrayMarshaler -> ArrayMarshaller
// MarshalLogArray implements the interface of ArrayMarshaler of zap.
func (cos ChannelOpSet) MarshalLogArray(enc zapcore.ArrayEncoder) error {
for _, o := range cos {
enc.AppendObject(o)
......
......@@ -41,7 +41,7 @@ func NewCluster(sessionManager *SessionManager, channelManager *ChannelManager)
return c
}
// Startup inits the cluster
// Startup inits the cluster with the given data nodes.
func (c *Cluster) Startup(nodes []*NodeInfo) error {
for _, node := range nodes {
c.sessionManager.AddSession(node)
......
......@@ -186,7 +186,7 @@ func TestRegister(t *testing.T) {
}
err = cluster.Register(info)
assert.Nil(t, err)
bufferChannels := channelManager.GetBuffer()
bufferChannels := channelManager.GetBufferChannels()
assert.Empty(t, bufferChannels.Channels)
nodeChannels := channelManager.GetChannels()
assert.EqualValues(t, 1, len(nodeChannels))
......@@ -297,7 +297,7 @@ func TestUnregister(t *testing.T) {
assert.Nil(t, err)
channels := channelManager.GetChannels()
assert.Empty(t, channels)
channel := channelManager.GetBuffer()
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)
assert.EqualValues(t, 1, len(channel.Channels))
assert.EqualValues(t, "ch_1", channel.Channels[0].Name)
......@@ -344,7 +344,7 @@ func TestWatchIfNeeded(t *testing.T) {
channels := channelManager.GetChannels()
assert.Empty(t, channels)
channel := channelManager.GetBuffer()
channel := channelManager.GetBufferChannels()
assert.NotNil(t, channel)
assert.EqualValues(t, "ch1", channel.Channels[0].Name)
})
......@@ -423,7 +423,7 @@ func TestConsistentHashPolicy(t *testing.T) {
hash.Remove("3")
err = cluster.UnRegister(nodeInfo3)
assert.Nil(t, err)
bufferChannels := channelManager.GetBuffer()
bufferChannels := channelManager.GetBufferChannels()
assert.EqualValues(t, 3, len(bufferChannels.Channels))
}
......
......@@ -374,7 +374,7 @@ func EmptyReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) Cha
return nil
}
// AverageReassignPolicy is a reassign policy that evenly assign channels
// AverageReassignPolicy is a reassigning policy that evenly assign channels
func AverageReassignPolicy(store ROChannelStore, reassigns []*NodeChannelInfo) ChannelOpSet {
channels := store.GetNodesChannels()
filterMap := make(map[int64]struct{})
......@@ -444,7 +444,7 @@ func BgCheckWithMaxWatchDuration(kv kv.TxnKV) ChannelBGChecker {
Channels: make([]*channel, 0),
}
for _, c := range ch.Channels {
k := buildChannelKey(ch.NodeID, c.Name)
k := buildNodeChannelKey(ch.NodeID, c.Name)
v, err := kv.Load(k)
if err != nil {
return nil, err
......
......@@ -377,7 +377,7 @@ func TestBgCheckWithMaxWatchDuration(t *testing.T) {
getKv := func(watchInfos []*watch) kv.TxnKV {
kv := memkv.NewMemoryKV()
for _, info := range watchInfos {
k := buildChannelKey(info.nodeID, info.name)
k := buildNodeChannelKey(info.nodeID, info.name)
v, _ := proto.Marshal(info.info)
kv.Save(k, string(v))
}
......
......@@ -185,7 +185,7 @@ func defaultFlushPolicy() flushPolicy {
return flushPolicyV1
}
// newSegmentManager should be the only way to retrieve SegmentManager
// newSegmentManager should be the only way to retrieve SegmentManager.
func newSegmentManager(meta *meta, allocator allocator, opts ...allocOption) *SegmentManager {
manager := &SegmentManager{
meta: meta,
......@@ -237,7 +237,7 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
segments = append(segments, segment)
}
// apply allocate policy
// Apply allocation policy.
maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
return nil, err
......
......@@ -597,7 +597,7 @@ func (s *Server) startWatchService(ctx context.Context) {
go s.watchService(ctx)
}
// watchService watchs services
// watchService watches services.
func (s *Server) watchService(ctx context.Context) {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
......@@ -793,6 +793,8 @@ func (s *Server) stopServerLoop() {
// return fmt.Errorf("can not find channel %s", channelName)
//}
// loadCollectionFromRootCoord communicates with RootCoord and asks for collection information.
// collection information will be added to server meta info.
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
......
......@@ -97,7 +97,7 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F
return resp, nil
}
// AssignSegmentID applies for segment ids and make allocation for records
// AssignSegmentID applies for segment ids and make allocation for records.
func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) {
if s.isClosed() {
return &datapb.AssignSegmentIDResponse{
......@@ -117,6 +117,7 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
zap.String("channelName", r.GetChannelName()),
zap.Uint32("count", r.GetCount()))
// Load the collection info from Root Coordinator, if it is not found in server meta.
if s.meta.GetCollection(r.GetCollectionID()) == nil {
err := s.loadCollectionFromRootCoord(ctx, r.GetCollectionID())
if err != nil {
......@@ -125,18 +126,19 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}
}
// Add the channel to cluster for watching.
s.cluster.Watch(r.ChannelName, r.CollectionID)
allocations, err := s.segmentManager.AllocSegment(ctx,
// Have segment manager allocate and return the segment allocation info.
segAlloc, err := s.segmentManager.AllocSegment(ctx,
r.CollectionID, r.PartitionID, r.ChannelName, int64(r.Count))
if err != nil {
log.Warn("failed to alloc segment", zap.Any("request", r), zap.Error(err))
continue
}
log.Debug("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", segAlloc))
log.Debug("success to assign segments", zap.Int64("collectionID", r.GetCollectionID()), zap.Any("assignments", allocations))
for _, allocation := range allocations {
for _, allocation := range segAlloc {
result := &datapb.SegmentIDAssignment{
SegID: allocation.SegmentID,
ChannelName: r.ChannelName,
......@@ -892,7 +894,7 @@ func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState
return
}
// WatchChannels notifies DataCoord to watch vchannels of a collection
// WatchChannels notifies DataCoord to watch vchannels of a collection.
func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsRequest) (*datapb.WatchChannelsResponse, error) {
log.Debug("receive watch channels request", zap.Any("channels", req.GetChannelNames()))
resp := &datapb.WatchChannelsResponse{
......
......@@ -59,7 +59,7 @@ type ValueKV interface {
Load(key string) (Value, error)
}
// BaseKV contains base operations of kv. Include save, load and remove.
// BaseKV contains basic kv operations, including save, load and remove.
type BaseKV interface {
Load(key string) (string, error)
MultiLoad(keys []string) ([]string, error)
......@@ -88,7 +88,7 @@ type TxnKV interface {
MultiSaveAndRemoveWithPrefix(saves map[string]string, removals []string) error
}
// MetaKv is TxnKV for meta data. It should save data with lease.
// MetaKv is TxnKV for metadata. It should save data with lease.
type MetaKv interface {
TxnKV
GetPath(key string) string
......
......@@ -329,22 +329,22 @@ func (sa *segIDAssigner) syncSegments() (bool, error) {
var errMsg string
now := time.Now()
success := true
for _, info := range resp.SegIDAssignments {
if info.Status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Debug("proxy", zap.String("SyncSegment Error", info.Status.Reason))
errMsg += info.Status.Reason
for _, segAssign := range resp.SegIDAssignments {
if segAssign.Status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Debug("proxy", zap.String("SyncSegment Error", segAssign.Status.Reason))
errMsg += segAssign.Status.Reason
errMsg += "\n"
success = false
continue
}
assign, err := sa.getAssign(info.CollectionID, info.PartitionID, info.ChannelName)
assign, err := sa.getAssign(segAssign.CollectionID, segAssign.PartitionID, segAssign.ChannelName)
segInfo2 := &segInfo{
segID: info.SegID,
count: info.Count,
expireTime: info.ExpireTime,
segID: segAssign.SegID,
count: segAssign.Count,
expireTime: segAssign.ExpireTime,
}
if err != nil {
colInfos, ok := sa.assignInfos[info.CollectionID]
colInfos, ok := sa.assignInfos[segAssign.CollectionID]
if !ok {
colInfos = list.New()
}
......@@ -352,13 +352,13 @@ func (sa *segIDAssigner) syncSegments() (bool, error) {
segInfos.PushBack(segInfo2)
assign = &assignInfo{
collID: info.CollectionID,
partitionID: info.PartitionID,
channelName: info.ChannelName,
collID: segAssign.CollectionID,
partitionID: segAssign.PartitionID,
channelName: segAssign.ChannelName,
segInfos: segInfos,
}
colInfos.PushBack(assign)
sa.assignInfos[info.CollectionID] = colInfos
sa.assignInfos[segAssign.CollectionID] = colInfos
} else {
assign.segInfos.PushBack(segInfo2)
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册