未验证 提交 cc69c5cd 编写于 作者: Y yah01 提交者: GitHub

Make Cluster interface's methods called outside public (#17315)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 c76b4ade
......@@ -42,7 +42,7 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan
var onlineNodeIDs []int64
for {
if replicaID == -1 {
onlineNodeIDs = cluster.onlineNodeIDs()
onlineNodeIDs = cluster.OnlineNodeIDs()
} else {
replica, err := metaCache.getReplicaByID(replicaID)
if err != nil {
......@@ -50,7 +50,7 @@ func shuffleChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChan
}
replicaNodes := replica.GetNodeIds()
for _, nodeID := range replicaNodes {
if ok, err := cluster.isOnline(nodeID); err == nil && ok {
if ok, err := cluster.IsOnline(nodeID); err == nil && ok {
onlineNodeIDs = append(onlineNodeIDs, nodeID)
}
}
......
......@@ -50,7 +50,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
}
meta, err := newMeta(baseCtx, kv, nil, idAllocator)
assert.Nil(t, err)
cluster := &queryNodeCluster{
var cluster Cluster = &queryNodeCluster{
ctx: baseCtx,
cancel: cancel,
client: kv,
......@@ -87,7 +87,7 @@ func TestShuffleChannelsToQueryNode(t *testing.T) {
assert.Nil(t, err)
nodeSession := node.session
nodeID := node.queryNodeID
cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect)
cluster.RegisterNode(baseCtx, nodeSession, nodeID, disConnect)
waitQueryNodeOnline(cluster, nodeID)
err = shuffleChannelsToQueryNode(baseCtx, reqs, cluster, meta, false, nil, nil, -1)
......
......@@ -46,40 +46,43 @@ const (
// Cluster manages all query node connections and grpc requests
type Cluster interface {
// Collection/Parition
ReleaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
ReleasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
// Segment
LoadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
ReleaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
GetSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
GetSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error)
SyncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error
// Channel
WatchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
WatchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error
HasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
// Node
RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error
GetNodeInfoByID(nodeID int64) (Node, error)
RemoveNodeInfo(nodeID int64) error
StopNode(nodeID int64)
OnlineNodeIDs() []int64
IsOnline(nodeID int64) (bool, error)
OfflineNodeIDs() []int64
HasNode(nodeID int64) bool
GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse
AllocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error
AllocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error
AssignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error
GetSessionVersion() int64
// Inner
reloadFromKV() error
getComponentInfos(ctx context.Context) []*internalpb.ComponentInfo
loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error
hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)
getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error)
syncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error
registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error
getNodeInfoByID(nodeID int64) (Node, error)
removeNodeInfo(nodeID int64) error
stopNode(nodeID int64)
onlineNodeIDs() []int64
isOnline(nodeID int64) (bool, error)
offlineNodeIDs() []int64
hasNode(nodeID int64) bool
allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error
allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error
assignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error
getSessionVersion() int64
getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse
}
type newQueryNodeFn func(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error)
......@@ -135,7 +138,6 @@ func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdK
// Reload trigger task, trigger task states, internal task, internal task state from etcd
// Assign the internal task to the corresponding trigger task as a child task
func (c *queryNodeCluster) reloadFromKV() error {
toLoadMetaNodeIDs := make([]int64, 0)
// get current online session
onlineNodeSessions, version, _ := c.session.GetSessions(typeutil.QueryNodeRole)
onlineSessionMap := make(map[int64]*sessionutil.Session)
......@@ -145,12 +147,11 @@ func (c *queryNodeCluster) reloadFromKV() error {
}
for nodeID, session := range onlineSessionMap {
log.Info("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID))
err := c.registerNode(c.ctx, session, nodeID, disConnect)
err := c.RegisterNode(c.ctx, session, nodeID, disConnect)
if err != nil {
log.Warn("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
}
c.sessionVersion = version
......@@ -173,19 +174,18 @@ func (c *queryNodeCluster) reloadFromKV() error {
log.Warn("watchNodeLoop: unmarshal session error", zap.Error(err))
return err
}
err = c.registerNode(context.Background(), session, nodeID, offline)
err = c.RegisterNode(context.Background(), session, nodeID, offline)
if err != nil {
log.Warn("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
}
}
return nil
}
func (c *queryNodeCluster) getSessionVersion() int64 {
func (c *queryNodeCluster) GetSessionVersion() int64 {
return c.sessionVersion
}
......@@ -201,7 +201,7 @@ func (c *queryNodeCluster) getComponentInfos(ctx context.Context) []*internalpb.
return subComponentInfos
}
func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
func (c *queryNodeCluster) LoadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[nodeID]; ok {
......@@ -221,7 +221,7 @@ func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *q
return fmt.Errorf("loadSegments: can't find QueryNode by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) releaseSegments(ctx context.Context, leaderID int64, in *querypb.ReleaseSegmentsRequest) error {
func (c *queryNodeCluster) ReleaseSegments(ctx context.Context, leaderID int64, in *querypb.ReleaseSegmentsRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[leaderID]; ok {
......@@ -246,7 +246,7 @@ func (c *queryNodeCluster) releaseSegments(ctx context.Context, leaderID int64,
return fmt.Errorf("releaseSegments: can't find QueryNode by nodeID, nodeID = %d", leaderID)
}
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[nodeID]; ok {
......@@ -281,7 +281,7 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in
return fmt.Errorf("watchDmChannels: can't find QueryNode by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error {
func (c *queryNodeCluster) WatchDeltaChannels(ctx context.Context, nodeID int64, in *querypb.WatchDeltaChannelsRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[nodeID]; ok {
......@@ -301,14 +301,14 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64,
return fmt.Errorf("watchDeltaChannels: can't find QueryNode by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) hasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
func (c *queryNodeCluster) HasWatchedDeltaChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
c.RLock()
defer c.RUnlock()
return c.nodes[nodeID].hasWatchedDeltaChannel(collectionID)
}
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
func (c *queryNodeCluster) ReleaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[nodeID]; ok {
......@@ -328,7 +328,7 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64,
return fmt.Errorf("releaseCollection: can't find QueryNode by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
func (c *queryNodeCluster) ReleasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
c.RLock()
var targetNode Node
if node, ok := c.nodes[nodeID]; ok {
......@@ -348,7 +348,7 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
return fmt.Errorf("releasePartitions: can't find QueryNode by nodeID, nodeID = %d", nodeID)
}
func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) {
func (c *queryNodeCluster) GetSegmentInfoByID(ctx context.Context, segmentID UniqueID) (*querypb.SegmentInfo, error) {
segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
if err != nil {
return nil, err
......@@ -381,7 +381,7 @@ func (c *queryNodeCluster) getSegmentInfoByID(ctx context.Context, segmentID Uni
return nil, fmt.Errorf("updateSegmentInfo: can't find segment %d on QueryNode %d", segmentID, segmentInfo.NodeID)
}
func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
func (c *queryNodeCluster) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
type respTuple struct {
res *querypb.GetSegmentInfoResponse
err error
......@@ -447,7 +447,7 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
return segmentInfos, nil
}
func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
func (c *queryNodeCluster) GetSegmentInfoByNode(ctx context.Context, nodeID int64, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.RLock()
node, ok := c.nodes[nodeID]
c.RUnlock()
......@@ -462,7 +462,7 @@ func (c *queryNodeCluster) getSegmentInfoByNode(ctx context.Context, nodeID int6
return res.GetInfos(), nil
}
func (c *queryNodeCluster) syncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error {
func (c *queryNodeCluster) SyncReplicaSegments(ctx context.Context, leaderID UniqueID, in *querypb.SyncReplicaSegmentsRequest) error {
c.RLock()
leader, ok := c.nodes[leaderID]
c.RUnlock()
......@@ -478,7 +478,7 @@ type queryNodeGetMetricsResponse struct {
err error
}
func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
func (c *queryNodeCluster) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
c.RLock()
var wg sync.WaitGroup
cnt := len(c.nodes)
......@@ -527,7 +527,7 @@ func (c *queryNodeCluster) setNodeState(nodeID int64, node Node, state nodeState
node.setState(state)
}
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
c.Lock()
defer c.Unlock()
......@@ -559,7 +559,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
return fmt.Errorf("registerNode: QueryNode %d alredy exists in cluster", id)
}
func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) {
func (c *queryNodeCluster) GetNodeInfoByID(nodeID int64) (Node, error) {
c.RLock()
node, ok := c.nodes[nodeID]
c.RUnlock()
......@@ -574,7 +574,7 @@ func (c *queryNodeCluster) getNodeInfoByID(nodeID int64) (Node, error) {
return nodeInfo, nil
}
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
func (c *queryNodeCluster) RemoveNodeInfo(nodeID int64) error {
c.Lock()
defer c.Unlock()
......@@ -591,7 +591,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
return nil
}
func (c *queryNodeCluster) stopNode(nodeID int64) {
func (c *queryNodeCluster) StopNode(nodeID int64) {
c.RLock()
defer c.RUnlock()
......@@ -602,7 +602,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) {
}
}
func (c *queryNodeCluster) onlineNodeIDs() []int64 {
func (c *queryNodeCluster) OnlineNodeIDs() []int64 {
c.RLock()
defer c.RUnlock()
......@@ -616,7 +616,7 @@ func (c *queryNodeCluster) onlineNodeIDs() []int64 {
return onlineNodeIDs
}
func (c *queryNodeCluster) offlineNodeIDs() []int64 {
func (c *queryNodeCluster) OfflineNodeIDs() []int64 {
c.RLock()
defer c.RUnlock()
......@@ -630,7 +630,7 @@ func (c *queryNodeCluster) offlineNodeIDs() []int64 {
return offlineNodeIDs
}
func (c *queryNodeCluster) hasNode(nodeID int64) bool {
func (c *queryNodeCluster) HasNode(nodeID int64) bool {
c.RLock()
defer c.RUnlock()
......@@ -641,7 +641,7 @@ func (c *queryNodeCluster) hasNode(nodeID int64) bool {
return false
}
func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) {
func (c *queryNodeCluster) IsOnline(nodeID int64) (bool, error) {
c.RLock()
defer c.RUnlock()
......@@ -667,17 +667,17 @@ func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) {
// }
//}
func (c *queryNodeCluster) allocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error {
func (c *queryNodeCluster) AllocateSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegmentsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error {
return c.segmentAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs, includeNodeIDs, replicaID)
}
func (c *queryNodeCluster) allocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error {
func (c *queryNodeCluster) AllocateChannelsToQueryNode(ctx context.Context, reqs []*querypb.WatchDmChannelsRequest, wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) error {
return c.channelAllocator(ctx, reqs, c, c.clusterMeta, wait, excludeNodeIDs, includeNodeIDs, replicaID)
}
// Return error if no enough nodes/resources to create replicas
func (c *queryNodeCluster) assignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error {
nodeIds := c.onlineNodeIDs()
func (c *queryNodeCluster) AssignNodesToReplicas(ctx context.Context, replicas []*milvuspb.ReplicaInfo, collectionSize uint64) error {
nodeIds := c.OnlineNodeIDs()
if len(nodeIds) < len(replicas) {
return fmt.Errorf("no enough nodes to create replicas, node_num=%d replica_num=%d", len(nodeIds), len(replicas))
}
......@@ -725,7 +725,7 @@ func getNodeInfos(cluster Cluster, nodeIds []UniqueID) []*queryNode {
wg.Add(1)
go func(id UniqueID) {
defer wg.Done()
info, err := cluster.getNodeInfoByID(id)
info, err := cluster.GetNodeInfoByID(id)
if err != nil {
return
}
......
......@@ -430,7 +430,7 @@ func TestGrpcRequest(t *testing.T) {
handler, err := newChannelUnsubscribeHandler(baseCtx, kv, factory)
assert.Nil(t, err)
cluster := &queryNodeCluster{
var cluster Cluster = &queryNodeCluster{
ctx: baseCtx,
cancel: cancel,
client: kv,
......@@ -442,7 +442,7 @@ func TestGrpcRequest(t *testing.T) {
}
t.Run("Test GetNodeInfoByIDWithNodeNotExist", func(t *testing.T) {
_, err := cluster.getNodeInfoByID(defaultQueryNodeID)
_, err := cluster.GetNodeInfoByID(defaultQueryNodeID)
assert.NotNil(t, err)
})
......@@ -453,7 +453,7 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, defaultQueryNodeID, getSegmentInfoReq)
_, err = cluster.GetSegmentInfoByNode(baseCtx, defaultQueryNodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
......@@ -461,7 +461,7 @@ func TestGrpcRequest(t *testing.T) {
assert.Nil(t, err)
nodeSession := node.session
nodeID := node.queryNodeID
cluster.registerNode(baseCtx, nodeSession, nodeID, disConnect)
cluster.RegisterNode(baseCtx, nodeSession, nodeID, disConnect)
waitQueryNodeOnline(cluster, nodeID)
t.Run("Test GetComponentInfos", func(t *testing.T) {
......@@ -481,7 +481,7 @@ func TestGrpcRequest(t *testing.T) {
Schema: genDefaultCollectionSchema(false),
CollectionID: defaultCollectionID,
}
err := cluster.loadSegments(baseCtx, nodeID, loadSegmentReq)
err := cluster.LoadSegments(baseCtx, nodeID, loadSegmentReq)
assert.Nil(t, err)
})
......@@ -492,7 +492,7 @@ func TestGrpcRequest(t *testing.T) {
PartitionIDs: []UniqueID{defaultPartitionID},
SegmentIDs: []UniqueID{defaultSegmentID},
}
err := cluster.releaseSegments(baseCtx, nodeID, releaseSegmentReq)
err := cluster.ReleaseSegments(baseCtx, nodeID, releaseSegmentReq)
assert.Nil(t, err)
})
......@@ -503,7 +503,7 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq)
_, err = cluster.GetSegmentInfo(baseCtx, getSegmentInfoReq)
assert.Nil(t, err)
})
......@@ -514,7 +514,7 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
_, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.Nil(t, err)
})
......@@ -527,7 +527,7 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfo(baseCtx, getSegmentInfoReq)
_, err = cluster.GetSegmentInfo(baseCtx, getSegmentInfoReq)
assert.NotNil(t, err)
})
......@@ -538,14 +538,14 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
_, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
node.getSegmentInfos = returnSuccessGetSegmentInfoResult
t.Run("Test GetNodeInfoByID", func(t *testing.T) {
res, err := cluster.getNodeInfoByID(nodeID)
res, err := cluster.GetNodeInfoByID(nodeID)
assert.Nil(t, err)
assert.NotNil(t, res)
})
......@@ -553,13 +553,13 @@ func TestGrpcRequest(t *testing.T) {
node.getMetrics = returnFailedGetMetricsResult
t.Run("Test GetNodeInfoByIDFailed", func(t *testing.T) {
_, err := cluster.getNodeInfoByID(nodeID)
_, err := cluster.GetNodeInfoByID(nodeID)
assert.NotNil(t, err)
})
node.getMetrics = returnSuccessGetMetricsResult
cluster.stopNode(nodeID)
cluster.StopNode(nodeID)
t.Run("Test GetSegmentInfoByNodeAfterNodeStop", func(t *testing.T) {
getSegmentInfoReq := &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
......@@ -567,7 +567,7 @@ func TestGrpcRequest(t *testing.T) {
},
CollectionID: defaultCollectionID,
}
_, err = cluster.getSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
_, err = cluster.GetSegmentInfoByNode(baseCtx, nodeID, getSegmentInfoReq)
assert.NotNil(t, err)
})
......@@ -608,7 +608,7 @@ func TestSetNodeState(t *testing.T) {
node, err := startQueryNodeServer(baseCtx)
assert.Nil(t, err)
err = cluster.registerNode(baseCtx, node.session, node.queryNodeID, disConnect)
err = cluster.RegisterNode(baseCtx, node.session, node.queryNodeID, disConnect)
assert.Nil(t, err)
waitQueryNodeOnline(cluster, node.queryNodeID)
......@@ -627,7 +627,7 @@ func TestSetNodeState(t *testing.T) {
err = meta.setDeltaChannel(defaultCollectionID, []*datapb.VchannelInfo{deltaChannelInfo})
assert.Nil(t, err)
nodeInfo, err := cluster.getNodeInfoByID(node.queryNodeID)
nodeInfo, err := cluster.GetNodeInfoByID(node.queryNodeID)
assert.Nil(t, err)
cluster.setNodeState(node.queryNodeID, nodeInfo, offline)
assert.Equal(t, 1, len(handler.downNodeChan))
......
......@@ -850,7 +850,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
//TODO::get segment infos from MetaReplica
//segmentIDs := req.SegmentIDs
//segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs)
segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
segmentInfos, err := qc.cluster.GetSegmentInfo(ctx, req)
if err != nil {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
......@@ -1134,7 +1134,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
}
}
isShardAvailable, err := qc.cluster.isOnline(shard.LeaderID)
isShardAvailable, err := qc.cluster.IsOnline(shard.LeaderID)
if err != nil || !isShardAvailable {
log.Warn("shard leader is unavailable",
zap.Int64("collectionID", replica.CollectionID),
......@@ -1148,7 +1148,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
nodes := shardNodes[shard.DmChannelName]
for _, nodeID := range replica.NodeIds {
if _, ok := nodes[nodeID]; ok {
if ok, err := qc.cluster.isOnline(nodeID); err != nil || !ok {
if ok, err := qc.cluster.IsOnline(nodeID); err != nil || !ok {
isShardAvailable = false
break
}
......
......@@ -441,7 +441,7 @@ func TestGrpcTaskEnqueueFail(t *testing.T) {
queryCoord.scheduler.taskIDAllocator = failedAllocator
waitQueryNodeOnline(queryCoord.cluster, queryNode.queryNodeID)
assert.NotEmpty(t, queryCoord.cluster.onlineNodeIDs())
assert.NotEmpty(t, queryCoord.cluster.OnlineNodeIDs())
t.Run("Test LoadPartition", func(t *testing.T) {
status, err := queryCoord.LoadPartitions(ctx, &querypb.LoadPartitionsRequest{
......@@ -579,7 +579,7 @@ func TestLoadBalanceTask(t *testing.T) {
}
}
nodeID := queryNode1.queryNodeID
queryCoord.cluster.stopNode(nodeID)
queryCoord.cluster.StopNode(nodeID)
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
......@@ -914,7 +914,7 @@ func TestLoadCollectionWithReplicas(t *testing.T) {
}
// load collection with 3 replicas, but no enough querynodes
assert.Equal(t, 2, len(queryCoord.cluster.onlineNodeIDs()))
assert.Equal(t, 2, len(queryCoord.cluster.OnlineNodeIDs()))
status, err := queryCoord.LoadCollection(ctx, loadCollectionReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
......@@ -988,7 +988,7 @@ func TestLoadPartitionsWithReplicas(t *testing.T) {
}
// load collection with 3 replicas, but no enough querynodes
assert.Equal(t, 2, len(queryCoord.cluster.onlineNodeIDs()))
assert.Equal(t, 2, len(queryCoord.cluster.OnlineNodeIDs()))
status, err := queryCoord.LoadPartitions(ctx, loadPartitionsReq)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
......
......@@ -306,7 +306,7 @@ func reloadShardLeaderAddress(meta Meta, cluster Cluster) error {
isModified := false
for _, shard := range replica.ShardReplicas {
if len(shard.LeaderAddr) == 0 {
nodeInfo, err := cluster.getNodeInfoByID(shard.LeaderID)
nodeInfo, err := cluster.GetNodeInfoByID(shard.LeaderID)
if err != nil {
log.Warn("failed to retrieve the node's address",
zap.Int64("nodeID", shard.LeaderID),
......
......@@ -66,7 +66,7 @@ func getSystemInfoMetrics(
}
metricsinfo.FillDeployMetricsWithEnv(&clusterTopology.Self.SystemInfo)
nodesMetrics := qc.cluster.getMetrics(ctx, req)
nodesMetrics := qc.cluster.GetMetrics(ctx, req)
for _, nodeMetrics := range nodesMetrics {
if nodeMetrics.err != nil {
log.Warn("invalid metrics of query node was found",
......
......@@ -28,10 +28,10 @@ func NewMockCluster(cluster Cluster) *MockCluster {
}
}
func (mock *MockCluster) isOnline(nodeID int64) (bool, error) {
func (mock *MockCluster) IsOnline(nodeID int64) (bool, error) {
if mock.isOnlineHandler != nil {
return mock.isOnlineHandler(nodeID)
}
return mock.Cluster.isOnline(nodeID)
return mock.Cluster.IsOnline(nodeID)
}
......@@ -346,7 +346,7 @@ func (qc *QueryCoord) watchNodeLoop() {
}
}
offlineNodeIDs := qc.cluster.offlineNodeIDs()
offlineNodeIDs := qc.cluster.OfflineNodeIDs()
if len(offlineNodeIDs) != 0 {
loadBalanceSegment := &querypb.LoadBalanceRequest{
Base: &commonpb.MsgBase{
......@@ -371,7 +371,7 @@ func (qc *QueryCoord) watchNodeLoop() {
}
// TODO silverxia add Rewatch logic
qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil)
qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.GetSessionVersion()+1, nil)
qc.handleNodeEvent(ctx)
}
......@@ -388,7 +388,7 @@ func (qc *QueryCoord) allocateNode(nodeID int64) error {
return nil
}
func (qc *QueryCoord) getUnallocatedNodes() []int64 {
onlines := qc.cluster.onlineNodeIDs()
onlines := qc.cluster.OnlineNodeIDs()
var ret []int64
for _, n := range onlines {
replica, err := qc.meta.getReplicasByNodeID(n)
......@@ -429,7 +429,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Info("start add a QueryNode to cluster", zap.Any("nodeID", serverID))
err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
err := qc.cluster.RegisterNode(ctx, event.Session, serverID, disConnect)
if err != nil {
log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
continue
......@@ -444,13 +444,13 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID))
nodeExist := qc.cluster.hasNode(serverID)
nodeExist := qc.cluster.HasNode(serverID)
if !nodeExist {
log.Error("QueryNode not exist", zap.Int64("nodeID", serverID))
continue
}
qc.cluster.stopNode(serverID)
qc.cluster.StopNode(serverID)
offlineNodeCh <- serverID
}
}
......@@ -599,7 +599,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
nodeID2SegmentInfos := make(map[int64]map[UniqueID]*querypb.SegmentInfo)
for _, nodeID := range onlineNodeIDs {
if _, ok := nodeID2MemUsage[nodeID]; !ok {
nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID)
nodeInfo, err := qc.cluster.GetNodeInfoByID(nodeID)
if err != nil {
log.Warn("loadBalanceSegmentLoop: get node info from QueryNode failed",
zap.Int64("nodeID", nodeID), zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
......@@ -615,7 +615,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
leastSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
segmentInfos := qc.meta.getSegmentInfosByNodeAndCollection(nodeID, replica.GetCollectionID())
for _, segmentInfo := range segmentInfos {
leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID)
leastInfo, err := qc.cluster.GetSegmentInfoByID(ctx, segmentInfo.SegmentID)
if err != nil {
log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID),
zap.Int64("collection", replica.CollectionID), zap.Int64("replica", replica.ReplicaID),
......
......@@ -187,7 +187,7 @@ func TestWatchNodeLoop(t *testing.T) {
assert.Nil(t, err)
for {
offlineNodeIDs := queryCoord.cluster.offlineNodeIDs()
offlineNodeIDs := queryCoord.cluster.OfflineNodeIDs()
if len(offlineNodeIDs) != 0 {
log.Warn("find offline Nodes", zap.Int64s("offlineNodeIDs", offlineNodeIDs))
break
......@@ -233,7 +233,7 @@ func TestWatchNodeLoop(t *testing.T) {
nodeID := queryNode1.queryNodeID
waitQueryNodeOnline(queryCoord.cluster, nodeID)
onlineNodeIDs := queryCoord.cluster.onlineNodeIDs()
onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs()
assert.Equal(t, 1, len(onlineNodeIDs))
queryNode1.stop()
......@@ -598,7 +598,7 @@ func TestLoadBalanceSegmentLoop(t *testing.T) {
err = queryCoord.scheduler.Enqueue(loadPartitionTask)
assert.Nil(t, err)
waitTaskFinalState(loadPartitionTask, taskExpired)
nodeInfo, err := queryCoord.cluster.getNodeInfoByID(queryNode1.queryNodeID)
nodeInfo, err := queryCoord.cluster.GetNodeInfoByID(queryNode1.queryNodeID)
assert.Nil(t, err)
if nodeInfo.(*queryNode).memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage {
break
......@@ -612,7 +612,7 @@ func TestLoadBalanceSegmentLoop(t *testing.T) {
// if sealed has been balance to query node2, than balance work
for {
segmentInfos, err := queryCoord.cluster.getSegmentInfoByNode(baseCtx, queryNode2.queryNodeID, &querypb.GetSegmentInfoRequest{
segmentInfos, err := queryCoord.cluster.GetSegmentInfoByNode(baseCtx, queryNode2.queryNodeID, &querypb.GetSegmentInfoRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_LoadBalanceSegments,
},
......
......@@ -69,7 +69,7 @@ func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool {
for {
allOffline := true
for _, nodeID := range nodeIDs {
isOnline, err := cluster.isOnline(nodeID)
isOnline, err := cluster.IsOnline(nodeID)
if err == nil && isOnline {
allOffline = false
break
......@@ -85,7 +85,7 @@ func waitAllQueryNodeOffline(cluster Cluster, nodeIDs []int64) bool {
func waitQueryNodeOnline(cluster Cluster, nodeID int64) {
for {
online, err := cluster.isOnline(nodeID)
online, err := cluster.IsOnline(nodeID)
if err != nil {
continue
}
......@@ -130,7 +130,7 @@ func TestQueryNode_MultiNode_stop(t *testing.T) {
})
assert.Nil(t, err)
time.Sleep(100 * time.Millisecond)
onlineNodeIDs := queryCoord.cluster.onlineNodeIDs()
onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs()
assert.NotEqual(t, 0, len(onlineNodeIDs))
queryNode2.stop()
err = removeNodeSession(queryNode2.queryNodeID)
......@@ -176,7 +176,7 @@ func TestQueryNode_MultiNode_reStart(t *testing.T) {
CollectionID: defaultCollectionID,
})
assert.Nil(t, err)
onlineNodeIDs := queryCoord.cluster.onlineNodeIDs()
onlineNodeIDs := queryCoord.cluster.OnlineNodeIDs()
assert.NotEqual(t, 0, len(onlineNodeIDs))
queryNode3.stop()
err = removeNodeSession(queryNode3.queryNodeID)
......
......@@ -45,7 +45,7 @@ func shuffleSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegment
}
for {
onlineNodeIDs := cluster.onlineNodeIDs()
onlineNodeIDs := cluster.OnlineNodeIDs()
if len(onlineNodeIDs) == 0 {
err := errors.New("no online QueryNode to allocate")
log.Error("shuffleSegmentsToQueryNode failed", zap.Error(err))
......@@ -117,7 +117,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
memUsageRate := make(map[int64]float64)
var onlineNodeIDs []int64
if replicaID == -1 {
onlineNodeIDs = cluster.onlineNodeIDs()
onlineNodeIDs = cluster.OnlineNodeIDs()
} else {
replica, err := metaCache.getReplicaByID(replicaID)
if err != nil {
......@@ -125,7 +125,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
}
replicaNodes := replica.GetNodeIds()
for _, nodeID := range replicaNodes {
if ok, err := cluster.isOnline(nodeID); err == nil && ok {
if ok, err := cluster.IsOnline(nodeID); err == nil && ok {
onlineNodeIDs = append(onlineNodeIDs, nodeID)
}
}
......@@ -148,7 +148,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
continue
}
// statistic nodeInfo, used memory, memory usage of every query node
nodeInfo, err := cluster.getNodeInfoByID(nodeID)
nodeInfo, err := cluster.GetNodeInfoByID(nodeID)
if err != nil {
log.Warn("shuffleSegmentsToQueryNodeV2: getNodeInfoByID failed", zap.Error(err))
continue
......
......@@ -99,7 +99,7 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
assert.Nil(t, err)
node1Session := node1.session
node1ID := node1.queryNodeID
cluster.registerNode(baseCtx, node1Session, node1ID, disConnect)
cluster.RegisterNode(baseCtx, node1Session, node1ID, disConnect)
waitQueryNodeOnline(cluster, node1ID)
t.Run("Test shuffleSegmentsToQueryNode", func(t *testing.T) {
......@@ -114,9 +114,9 @@ func TestShuffleSegmentsToQueryNode(t *testing.T) {
assert.Nil(t, err)
node2Session := node2.session
node2ID := node2.queryNodeID
cluster.registerNode(baseCtx, node2Session, node2ID, disConnect)
cluster.RegisterNode(baseCtx, node2Session, node2ID, disConnect)
waitQueryNodeOnline(cluster, node2ID)
cluster.stopNode(node1ID)
cluster.StopNode(node1ID)
t.Run("Test shuffleSegmentsToQueryNodeV2", func(t *testing.T) {
err = shuffleSegmentsToQueryNodeV2(baseCtx, reqs, cluster, meta, false, nil, nil, -1)
......
......@@ -336,7 +336,7 @@ func (lct *loadCollectionTask) updateTaskProcess() {
// wait watchDeltaChannel task done after loading segment
nodeID := getDstNodeIDByTask(t)
if t.msgType() == commonpb.MsgType_LoadSegments {
if !lct.cluster.hasWatchedDeltaChannel(lct.ctx, nodeID, collectionID) {
if !lct.cluster.HasWatchedDeltaChannel(lct.ctx, nodeID, collectionID) {
allDone = false
break
}
......@@ -456,7 +456,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
replicaIds[i] = replica.ReplicaID
}
err = lct.cluster.assignNodesToReplicas(ctx, replicas, collectionSize)
err = lct.cluster.AssignNodesToReplicas(ctx, replicas, collectionSize)
if err != nil {
log.Error("failed to assign nodes to replicas",
zap.Int64("collectionID", collectionID),
......@@ -524,7 +524,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
for _, internalTask := range internalTasks {
lct.addChildTask(internalTask)
if task, ok := internalTask.(*watchDmChannelTask); ok {
nodeInfo, err := lct.cluster.getNodeInfoByID(task.NodeID)
nodeInfo, err := lct.cluster.GetNodeInfoByID(task.NodeID)
if err != nil {
log.Error("loadCollectionTask: get shard leader node info failed",
zap.Int64("collectionID", collectionID),
......@@ -593,7 +593,7 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error {
}
func (lct *loadCollectionTask) rollBack(ctx context.Context) []task {
onlineNodeIDs := lct.cluster.onlineNodeIDs()
onlineNodeIDs := lct.cluster.OnlineNodeIDs()
resultTasks := make([]task, 0)
for _, nodeID := range onlineNodeIDs {
//brute force rollBack, should optimize
......@@ -686,7 +686,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
}
// TODO(yah01): broadcast to all nodes? Or only nodes serve the collection
onlineNodeIDs := rct.cluster.onlineNodeIDs()
onlineNodeIDs := rct.cluster.OnlineNodeIDs()
for _, nodeID := range onlineNodeIDs {
req := proto.Clone(rct.ReleaseCollectionRequest).(*querypb.ReleaseCollectionRequest)
req.NodeID = nodeID
......@@ -704,7 +704,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
} else {
// If the node crashed or be offline, the loaded segments are lost
defer rct.reduceRetryCount()
err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest)
err := rct.cluster.ReleaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest)
if err != nil {
log.Warn("releaseCollectionTask: release collection end, node occur error", zap.Int64("collectionID", collectionID), zap.Int64("nodeID", rct.NodeID))
// after release failed, the task will always redo
......@@ -780,7 +780,7 @@ func (lpt *loadPartitionTask) updateTaskProcess() {
// wait watchDeltaChannel task done after loading segment
nodeID := getDstNodeIDByTask(t)
if t.msgType() == commonpb.MsgType_LoadSegments {
if !lpt.cluster.hasWatchedDeltaChannel(lpt.ctx, nodeID, collectionID) {
if !lpt.cluster.HasWatchedDeltaChannel(lpt.ctx, nodeID, collectionID) {
allDone = false
break
}
......@@ -889,7 +889,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
replicaIds[i] = replica.ReplicaID
}
err = lpt.cluster.assignNodesToReplicas(ctx, replicas, collectionSize)
err = lpt.cluster.AssignNodesToReplicas(ctx, replicas, collectionSize)
if err != nil {
log.Error("failed to assign nodes to replicas",
zap.Int64("collectionID", collectionID),
......@@ -954,7 +954,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
for _, internalTask := range internalTasks {
lpt.addChildTask(internalTask)
if task, ok := internalTask.(*watchDmChannelTask); ok {
nodeInfo, err := lpt.cluster.getNodeInfoByID(task.NodeID)
nodeInfo, err := lpt.cluster.GetNodeInfoByID(task.NodeID)
if err != nil {
log.Error("loadCollectionTask: get shard leader node info failed",
zap.Int64("collectionID", collectionID),
......@@ -1031,7 +1031,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task {
collectionID := lpt.CollectionID
resultTasks := make([]task, 0)
//brute force rollBack, should optimize
onlineNodeIDs := lpt.cluster.onlineNodeIDs()
onlineNodeIDs := lpt.cluster.OnlineNodeIDs()
for _, nodeID := range onlineNodeIDs {
req := &querypb.ReleaseCollectionRequest{
Base: &commonpb.MsgBase{
......@@ -1119,7 +1119,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
// if nodeID ==0, it means that the release request has not been assigned to the specified query node
if rpt.NodeID <= 0 {
onlineNodeIDs := rpt.cluster.onlineNodeIDs()
onlineNodeIDs := rpt.cluster.OnlineNodeIDs()
for _, nodeID := range onlineNodeIDs {
req := proto.Clone(rpt.ReleasePartitionsRequest).(*querypb.ReleasePartitionsRequest)
req.NodeID = nodeID
......@@ -1137,7 +1137,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
} else {
// If the node crashed or be offline, the loaded segments are lost
defer rpt.reduceRetryCount()
err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest)
err := rpt.cluster.ReleasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest)
if err != nil {
log.Warn("ReleasePartitionsTask: release partition end, node occur error", zap.Int64("collectionID", collectionID), zap.String("nodeID", fmt.Sprintln(rpt.NodeID)))
// after release failed, the task will always redo
......@@ -1195,7 +1195,7 @@ func (lst *loadSegmentTask) marshal() ([]byte, error) {
}
func (lst *loadSegmentTask) isValid() bool {
online, err := lst.cluster.isOnline(lst.DstNodeID)
online, err := lst.cluster.IsOnline(lst.DstNodeID)
if err != nil {
return false
}
......@@ -1242,7 +1242,7 @@ func (lst *loadSegmentTask) preExecute(ctx context.Context) error {
func (lst *loadSegmentTask) execute(ctx context.Context) error {
defer lst.reduceRetryCount()
err := lst.cluster.loadSegments(ctx, lst.DstNodeID, lst.LoadSegmentsRequest)
err := lst.cluster.LoadSegments(ctx, lst.DstNodeID, lst.LoadSegmentsRequest)
if err != nil {
log.Warn("loadSegmentTask: loadSegment occur error", zap.Int64("taskID", lst.getTaskID()))
lst.setResultInfo(err)
......@@ -1322,7 +1322,7 @@ func (rst *releaseSegmentTask) marshal() ([]byte, error) {
}
func (rst *releaseSegmentTask) isValid() bool {
online, err := rst.cluster.isOnline(rst.NodeID)
online, err := rst.cluster.IsOnline(rst.NodeID)
if err != nil {
return false
}
......@@ -1350,7 +1350,7 @@ func (rst *releaseSegmentTask) preExecute(context.Context) error {
func (rst *releaseSegmentTask) execute(ctx context.Context) error {
defer rst.reduceRetryCount()
err := rst.cluster.releaseSegments(rst.ctx, rst.leaderID, rst.ReleaseSegmentsRequest)
err := rst.cluster.ReleaseSegments(rst.ctx, rst.leaderID, rst.ReleaseSegmentsRequest)
if err != nil {
log.Warn("releaseSegmentTask: releaseSegment occur error", zap.Int64("taskID", rst.getTaskID()))
rst.setResultInfo(err)
......@@ -1388,7 +1388,7 @@ func (wdt *watchDmChannelTask) marshal() ([]byte, error) {
}
func (wdt *watchDmChannelTask) isValid() bool {
online, err := wdt.cluster.isOnline(wdt.NodeID)
online, err := wdt.cluster.IsOnline(wdt.NodeID)
if err != nil {
return false
}
......@@ -1429,7 +1429,7 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error {
func (wdt *watchDmChannelTask) execute(ctx context.Context) error {
defer wdt.reduceRetryCount()
err := wdt.cluster.watchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest)
err := wdt.cluster.WatchDmChannels(wdt.ctx, wdt.NodeID, wdt.WatchDmChannelsRequest)
if err != nil {
log.Warn("watchDmChannelTask: watchDmChannel occur error", zap.Int64("taskID", wdt.getTaskID()))
wdt.setResultInfo(err)
......@@ -1502,7 +1502,7 @@ func (wdt *watchDeltaChannelTask) marshal() ([]byte, error) {
}
func (wdt *watchDeltaChannelTask) isValid() bool {
online, err := wdt.cluster.isOnline(wdt.NodeID)
online, err := wdt.cluster.IsOnline(wdt.NodeID)
if err != nil {
return false
}
......@@ -1544,7 +1544,7 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error {
func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error {
defer wdt.reduceRetryCount()
err := wdt.cluster.watchDeltaChannels(wdt.ctx, wdt.NodeID, wdt.WatchDeltaChannelsRequest)
err := wdt.cluster.WatchDeltaChannels(wdt.ctx, wdt.NodeID, wdt.WatchDeltaChannelsRequest)
if err != nil {
log.Warn("watchDeltaChannelTask: watchDeltaChannel occur error", zap.Int64("taskID", wdt.getTaskID()), zap.Error(err))
wdt.setResultInfo(err)
......@@ -2042,7 +2042,7 @@ func (lbt *loadBalanceTask) processManualLoadBalance(ctx context.Context) error
balancedSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
balancedSegmentIDs := make([]UniqueID, 0)
for _, nodeID := range lbt.SourceNodeIDs {
nodeExist := lbt.cluster.hasNode(nodeID)
nodeExist := lbt.cluster.HasNode(nodeID)
if !nodeExist {
err := fmt.Errorf("loadBalanceTask: query node %d is not exist to balance", nodeID)
log.Error(err.Error())
......@@ -2302,7 +2302,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
// then the queryCoord will panic, and the nodeInfo should not be removed immediately
// after queryCoord recovery, the balanceTask will redo
for _, offlineNodeID := range lbt.SourceNodeIDs {
err := lbt.cluster.removeNodeInfo(offlineNodeID)
err := lbt.cluster.RemoveNodeInfo(offlineNodeID)
if err != nil {
log.Error("loadBalanceTask: occur error when removing node info from cluster",
zap.Int64("nodeID", offlineNodeID),
......@@ -2345,7 +2345,7 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error {
leaderID := task.NodeID
dmChannel := task.Infos[0].ChannelName
nodeInfo, err := lbt.cluster.getNodeInfoByID(leaderID)
nodeInfo, err := lbt.cluster.GetNodeInfoByID(leaderID)
if err != nil {
log.Error("failed to get node info to update shard leader info",
zap.Int64("triggerTaskID", lbt.getTaskID()),
......@@ -2399,14 +2399,14 @@ func assignInternalTask(ctx context.Context,
broker *globalMetaBroker) ([]task, error) {
internalTasks := make([]task, 0)
err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
err := cluster.AllocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
if err != nil {
log.Error("assignInternalTask: assign segment to node failed", zap.Error(err))
return nil, err
}
log.Info("assignInternalTask: assign segment to node success", zap.Int("load segments", len(loadSegmentRequests)))
err = cluster.allocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
err = cluster.AllocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
if err != nil {
log.Error("assignInternalTask: assign dmChannel to node failed", zap.Error(err))
return nil, err
......
......@@ -1019,7 +1019,7 @@ func generateDerivedInternalTasks(triggerTask task, meta Meta, cluster Cluster)
collectionID := loadSegmentTask.CollectionID
replicaID := loadSegmentTask.GetReplicaID()
nodeID := loadSegmentTask.DstNodeID
if !cluster.hasWatchedDeltaChannel(triggerTask.traceCtx(), nodeID, collectionID) {
if !cluster.HasWatchedDeltaChannel(triggerTask.traceCtx(), nodeID, collectionID) {
addChannelWatchInfoFn(nodeID, collectionID, replicaID, watchDeltaChannelInfo)
}
}
......
......@@ -188,7 +188,7 @@ func syncReplicaSegments(ctx context.Context, cluster Cluster, childTasks []task
}
}
err := cluster.syncReplicaSegments(ctx, leader.LeaderID, &req)
err := cluster.SyncReplicaSegments(ctx, leader.LeaderID, &req)
if err != nil {
return err
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册