cluster.go 18.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13 14 15

import (
	"context"
16
	"encoding/json"
17 18
	"errors"
	"fmt"
19 20
	"path/filepath"
	"strconv"
21 22
	"sync"

23
	"github.com/golang/protobuf/proto"
24 25
	"go.uber.org/zap"

26
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
27 28
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
xige-16 已提交
29
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
30
	"github.com/milvus-io/milvus/internal/proto/querypb"
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
33 34 35
)

const (
36 37
	queryNodeMetaPrefix = "queryCoord-queryNodeMeta"
	queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
38 39
)

40
// Cluster manages all query node connections and grpc requests
41 42 43 44 45 46 47 48 49
type Cluster interface {
	reloadFromKV() error
	getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error)

	loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
	releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
	getNumSegments(nodeID int64) (int, error)

	watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
50
	//TODO:: removeDmChannel
51 52 53 54 55 56 57 58 59 60
	getNumDmChannels(nodeID int64) (int, error)

	hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
	getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo
	addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error
	removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error
	releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
	releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
	getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)

61
	registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error
62 63 64
	getNodeByID(nodeID int64) (Node, error)
	removeNodeInfo(nodeID int64) error
	stopNode(nodeID int64)
65 66 67
	onlineNodes() (map[int64]Node, error)
	isOnline(nodeID int64) (bool, error)
	offlineNodes() (map[int64]Node, error)
68

69 70 71
	getSessionVersion() int64

	getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse
72 73
}

X
xige-16 已提交
74 75
type newQueryNodeFn func(ctx context.Context, address string, id UniqueID, kv *etcdkv.EtcdKV) (Node, error)

76 77 78 79 80 81 82 83
type nodeState int

const (
	disConnect nodeState = 0
	online     nodeState = 1
	offline    nodeState = 2
)

84
type queryNodeCluster struct {
85 86
	ctx    context.Context
	cancel context.CancelFunc
87 88
	client *etcdkv.EtcdKV

89 90 91
	session        *sessionutil.Session
	sessionVersion int64

92
	sync.RWMutex
93 94
	clusterMeta Meta
	nodes       map[int64]Node
X
xige-16 已提交
95
	newNodeFn   newQueryNodeFn
96 97
}

98 99
func newQueryNodeCluster(ctx context.Context, clusterMeta Meta, kv *etcdkv.EtcdKV, newNodeFn newQueryNodeFn, session *sessionutil.Session) (*queryNodeCluster, error) {
	childCtx, cancel := context.WithCancel(ctx)
100
	nodes := make(map[int64]Node)
101
	c := &queryNodeCluster{
102 103
		ctx:         childCtx,
		cancel:      cancel,
104
		client:      kv,
105
		session:     session,
106 107
		clusterMeta: clusterMeta,
		nodes:       nodes,
X
xige-16 已提交
108
		newNodeFn:   newNodeFn,
109
	}
110 111 112 113 114 115 116 117
	err := c.reloadFromKV()
	if err != nil {
		return nil, err
	}

	return c, nil
}

118 119
// Reload trigger task, trigger task states, internal task, internal task state from etcd
// Assign the internal task to the corresponding trigger task as a child task
120
func (c *queryNodeCluster) reloadFromKV() error {
121 122 123 124 125 126 127
	toLoadMetaNodeIDs := make([]int64, 0)
	// get current online session
	onlineNodeSessions, version, _ := c.session.GetSessions(typeutil.QueryNodeRole)
	onlineSessionMap := make(map[int64]*sessionutil.Session)
	for _, session := range onlineNodeSessions {
		nodeID := session.ServerID
		onlineSessionMap[nodeID] = session
128
	}
129 130 131
	for nodeID, session := range onlineSessionMap {
		log.Debug("ReloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID))
		err := c.registerNode(c.ctx, session, nodeID, disConnect)
132
		if err != nil {
133
			log.Error("query node failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
134 135
			return err
		}
136 137 138
		toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
	}
	c.sessionVersion = version
139

140 141 142 143 144 145 146 147
	// load node information before power off from etcd
	oldStringNodeIDs, oldNodeSessions, err := c.client.LoadWithPrefix(queryNodeInfoPrefix)
	if err != nil {
		log.Error("reloadFromKV: get previous node info from etcd error", zap.Error(err))
		return err
	}
	for index := range oldStringNodeIDs {
		nodeID, err := strconv.ParseInt(filepath.Base(oldStringNodeIDs[index]), 10, 64)
148
		if err != nil {
149
			log.Error("WatchNodeLoop: parse nodeID error", zap.Error(err))
150 151
			return err
		}
152 153 154 155 156 157 158 159 160 161 162 163 164
		if _, ok := onlineSessionMap[nodeID]; !ok {
			session := &sessionutil.Session{}
			err = json.Unmarshal([]byte(oldNodeSessions[index]), session)
			if err != nil {
				log.Error("WatchNodeLoop: unmarshal session error", zap.Error(err))
				return err
			}
			err = c.registerNode(context.Background(), session, nodeID, offline)
			if err != nil {
				log.Debug("ReloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
				return err
			}
			toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
165 166
		}
	}
167 168 169

	// load collection meta of queryNode from etcd
	for _, nodeID := range toLoadMetaNodeIDs {
170
		infoPrefix := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, nodeID)
171
		_, collectionValues, err := c.client.LoadWithPrefix(infoPrefix)
172 173 174
		if err != nil {
			return err
		}
175 176
		for _, value := range collectionValues {
			collectionInfo := &querypb.CollectionInfo{}
177
			err = proto.Unmarshal([]byte(value), collectionInfo)
178 179 180
			if err != nil {
				return err
			}
181
			err = c.nodes[nodeID].setCollectionInfo(collectionInfo)
182
			if err != nil {
183
				log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
184 185 186 187 188
				return err
			}
		}
	}
	return nil
189 190
}

191 192 193 194
func (c *queryNodeCluster) getSessionVersion() int64 {
	return c.sessionVersion
}

195
func (c *queryNodeCluster) getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error) {
196 197 198
	c.RLock()
	defer c.RUnlock()
	subComponentInfos := make([]*internalpb.ComponentInfo, 0)
199
	nodes, err := c.getOnlineNodes()
200
	if err != nil {
201
		log.Debug("GetComponentInfos: failed get on service nodes", zap.String("error info", err.Error()))
202 203
		return nil, err
	}
204 205 206
	for _, node := range nodes {
		componentState := node.getComponentInfo(ctx)
		subComponentInfos = append(subComponentInfos, componentState)
207 208
	}

209
	return subComponentInfos, nil
210 211
}

212
func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
213 214
	c.Lock()
	defer c.Unlock()
215

216
	if node, ok := c.nodes[nodeID]; ok {
217
		segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
218 219
		for _, info := range in.Infos {
			segmentID := info.SegmentID
220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
			segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
			if err == nil {
				segmentInfos[segmentID] = proto.Clone(segmentInfo).(*querypb.SegmentInfo)
				if in.LoadCondition != querypb.TriggerCondition_loadBalance {
					segmentInfo.SegmentState = querypb.SegmentState_sealing
					segmentInfo.NodeID = nodeID
				}
			} else {
				segmentInfo = &querypb.SegmentInfo{
					SegmentID:    segmentID,
					CollectionID: info.CollectionID,
					PartitionID:  info.PartitionID,
					NodeID:       nodeID,
					SegmentState: querypb.SegmentState_sealing,
				}
235
			}
236
			c.clusterMeta.setSegmentInfo(segmentID, segmentInfo)
237
		}
238 239
		err := node.loadSegments(ctx, in)
		if err != nil {
240 241 242 243 244 245 246 247
			for _, info := range in.Infos {
				segmentID := info.SegmentID
				if _, ok = segmentInfos[segmentID]; ok {
					c.clusterMeta.setSegmentInfo(segmentID, segmentInfos[segmentID])
					continue
				}
				c.clusterMeta.deleteSegmentInfoByID(segmentID)
			}
248 249
			log.Debug("LoadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
			return err
250
		}
251
		return nil
252
	}
253
	return errors.New("LoadSegments: Can't find query node by nodeID ")
254 255
}

256
func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error {
257 258 259 260
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
261
		if !node.isOnline() {
262
			return errors.New("node offline")
263
		}
264 265 266 267 268

		err := node.releaseSegments(ctx, in)
		if err != nil {
			log.Debug("ReleaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
			return err
269
		}
270 271 272

		for _, segmentID := range in.SegmentIDs {
			c.clusterMeta.deleteSegmentInfoByID(segmentID)
273
		}
274
		return nil
275 276
	}

277
	return errors.New("ReleaseSegments: Can't find query node by nodeID ")
278 279
}

280
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
281 282
	c.Lock()
	defer c.Unlock()
283

284
	if node, ok := c.nodes[nodeID]; ok {
285 286 287 288
		err := node.watchDmChannels(ctx, in)
		if err != nil {
			log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
			return err
289
		}
290 291 292 293
		channels := make([]string, 0)
		for _, info := range in.Infos {
			channels = append(channels, info.ChannelName)
		}
294

295 296 297 298 299 300
		collectionID := in.CollectionID
		//c.clusterMeta.addCollection(collectionID, in.Schema)
		err = c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
		if err != nil {
			log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
			return err
301
		}
302 303

		return nil
304
	}
305
	return errors.New("WatchDmChannels: Can't find query node by nodeID ")
306 307 308 309 310 311 312 313 314
}

func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
	c.Lock()
	defer c.Unlock()

	return c.nodes[nodeID].hasWatchedQueryChannel(collectionID)
}

315
func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error {
316 317 318
	c.Lock()
	defer c.Unlock()
	if node, ok := c.nodes[nodeID]; ok {
319 320 321 322
		err := node.addQueryChannel(ctx, in)
		if err != nil {
			log.Debug("AddQueryChannel: queryNode add query channel error", zap.String("error", err.Error()))
			return err
323
		}
324
		return nil
325 326
	}

327
	return errors.New("AddQueryChannel: can't find query node by nodeID")
328
}
329
func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error {
330 331 332 333
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
334 335 336 337
		err := node.removeQueryChannel(ctx, in)
		if err != nil {
			log.Debug("RemoveQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
			return err
338
		}
339 340

		return nil
341 342
	}

343
	return errors.New("RemoveQueryChannel: can't find query node by nodeID")
344 345
}

346
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
347 348 349 350
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
351 352 353 354
		err := node.releaseCollection(ctx, in)
		if err != nil {
			log.Debug("ReleaseCollection: queryNode release collection error", zap.String("error", err.Error()))
			return err
355
		}
356 357 358 359
		err = c.clusterMeta.releaseCollection(in.CollectionID)
		if err != nil {
			log.Debug("ReleaseCollection: meta release collection error", zap.String("error", err.Error()))
			return err
360
		}
361
		return nil
362 363
	}

364
	return errors.New("ReleaseCollection: can't find query node by nodeID")
365 366
}

367
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
368 369 370 371
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
372 373 374 375
		err := node.releasePartitions(ctx, in)
		if err != nil {
			log.Debug("ReleasePartitions: queryNode release partitions error", zap.String("error", err.Error()))
			return err
376
		}
377

378 379 380 381 382
		for _, partitionID := range in.PartitionIDs {
			err = c.clusterMeta.releasePartition(in.CollectionID, partitionID)
			if err != nil {
				log.Debug("ReleasePartitions: meta release partitions error", zap.String("error", err.Error()))
				return err
383 384
			}
		}
385
		return nil
386 387
	}

388
	return errors.New("ReleasePartitions: can't find query node by nodeID")
389 390 391
}

func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
392 393
	c.RLock()
	defer c.RUnlock()
394 395

	segmentInfos := make([]*querypb.SegmentInfo, 0)
396 397
	for _, node := range c.nodes {
		res, err := node.getSegmentInfo(ctx, in)
398 399 400
		if err != nil {
			return nil, err
		}
401 402 403
		if res != nil {
			segmentInfos = append(segmentInfos, res.Infos...)
		}
404 405
	}

406
	//TODO::update meta
407 408 409
	return segmentInfos, nil
}

410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430
type queryNodeGetMetricsResponse struct {
	resp *milvuspb.GetMetricsResponse
	err  error
}

func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
	c.RLock()
	defer c.RUnlock()

	ret := make([]queryNodeGetMetricsResponse, 0, len(c.nodes))
	for _, node := range c.nodes {
		resp, err := node.getMetrics(ctx, in)
		ret = append(ret, queryNodeGetMetricsResponse{
			resp: resp,
			err:  err,
		})
	}

	return ret
}

431
func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
432 433
	c.RLock()
	defer c.RUnlock()
434 435

	if _, ok := c.nodes[nodeID]; !ok {
436
		return 0, errors.New("GetNumDmChannels: Can't find query node by nodeID ")
437 438 439
	}

	numChannel := 0
440 441
	collectionInfos := c.clusterMeta.showCollections()
	for _, info := range collectionInfos {
442 443 444 445 446 447 448 449 450 451
		for _, channelInfo := range info.ChannelInfos {
			if channelInfo.NodeIDLoaded == nodeID {
				numChannel++
			}
		}
	}
	return numChannel, nil
}

func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
452 453
	c.RLock()
	defer c.RUnlock()
454 455

	if _, ok := c.nodes[nodeID]; !ok {
456
		return 0, errors.New("getNumSegments: Can't find query node by nodeID ")
457 458 459
	}

	numSegment := 0
460 461 462 463 464 465 466
	segmentInfos := make([]*querypb.SegmentInfo, 0)
	collectionInfos := c.clusterMeta.showCollections()
	for _, info := range collectionInfos {
		res := c.clusterMeta.showSegmentInfos(info.CollectionID, nil)
		segmentInfos = append(segmentInfos, res...)
	}
	for _, info := range segmentInfos {
467 468 469 470 471 472 473
		if info.NodeID == nodeID {
			numSegment++
		}
	}
	return numSegment, nil
}

474
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID, state nodeState) error {
475 476 477
	c.Lock()
	defer c.Unlock()

478
	if _, ok := c.nodes[id]; !ok {
479 480
		sessionJSON, err := json.Marshal(session)
		if err != nil {
481
			log.Debug("RegisterNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
482 483 484 485 486 487 488
			return err
		}
		key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
		err = c.client.Save(key, string(sessionJSON))
		if err != nil {
			return err
		}
489
		node, err := c.newNodeFn(ctx, session.Address, id, c.client)
X
xige-16 已提交
490 491 492 493
		if err != nil {
			log.Debug("RegisterNode: create a new query node failed", zap.Int64("nodeID", id), zap.Error(err))
			return err
		}
494 495 496 497 498
		node.setState(state)
		if state < online {
			go node.start()
		}
		c.nodes[id] = node
499
		log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))
500 501
		return nil
	}
502
	return fmt.Errorf("RegisterNode: node %d alredy exists in cluster", id)
503
}
504

505
func (c *queryNodeCluster) getNodeByID(nodeID int64) (Node, error) {
506 507 508 509 510 511 512
	c.RLock()
	defer c.RUnlock()

	if node, ok := c.nodes[nodeID]; ok {
		return node, nil
	}

513
	return nil, fmt.Errorf("GetNodeByID: query node %d not exist", nodeID)
514 515
}

516
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
517 518 519
	c.Lock()
	defer c.Unlock()

520
	key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID)
521 522 523 524 525
	err := c.client.Remove(key)
	if err != nil {
		return err
	}

526 527 528 529 530 531
	if _, ok := c.nodes[nodeID]; ok {
		err = c.nodes[nodeID].clearNodeInfo()
		if err != nil {
			return err
		}
		delete(c.nodes, nodeID)
532
		log.Debug("RemoveNodeInfo: delete nodeInfo in cluster MetaReplica and etcd", zap.Int64("nodeID", nodeID))
533 534 535
	}

	return nil
536 537
}

538
func (c *queryNodeCluster) stopNode(nodeID int64) {
X
xige-16 已提交
539 540 541
	c.Lock()
	defer c.Unlock()

542 543
	if node, ok := c.nodes[nodeID]; ok {
		node.stop()
544
		log.Debug("StopNode: queryNode offline", zap.Int64("nodeID", nodeID))
545 546 547
	}
}

548
func (c *queryNodeCluster) onlineNodes() (map[int64]Node, error) {
549 550
	c.RLock()
	defer c.RUnlock()
551

552
	return c.getOnlineNodes()
553 554
}

555
func (c *queryNodeCluster) getOnlineNodes() (map[int64]Node, error) {
556
	nodes := make(map[int64]Node)
557
	for nodeID, node := range c.nodes {
558
		if node.isOnline() {
559
			nodes[nodeID] = node
560 561
		}
	}
562
	if len(nodes) == 0 {
563
		return nil, errors.New("GetOnlineNodes: no queryNode is alive")
564 565
	}

566 567 568
	return nodes, nil
}

569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
func (c *queryNodeCluster) offlineNodes() (map[int64]Node, error) {
	c.RLock()
	defer c.RUnlock()

	return c.getOfflineNodes()
}

func (c *queryNodeCluster) getOfflineNodes() (map[int64]Node, error) {
	nodes := make(map[int64]Node)
	for nodeID, node := range c.nodes {
		if node.isOffline() {
			nodes[nodeID] = node
		}
	}
	if len(nodes) == 0 {
		return nil, errors.New("GetOfflineNodes: no queryNode is offline")
	}

	return nodes, nil
}

func (c *queryNodeCluster) isOnline(nodeID int64) (bool, error) {
591 592 593 594
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
595
		return node.isOnline(), nil
596 597
	}

598
	return false, fmt.Errorf("IsOnService: query node %d not exist", nodeID)
599 600
}

601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618
//func (c *queryNodeCluster) printMeta() {
//	c.RLock()
//	defer c.RUnlock()
//
//	for id, node := range c.nodes {
//		if node.isOnline() {
//			collectionInfos := node.showCollections()
//			for _, info := range collectionInfos {
//				log.Debug("PrintMeta: query coordinator cluster info: collectionInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
//			}
//
//			queryChannelInfos := node.showWatchedQueryChannels()
//			for _, info := range queryChannelInfos {
//				log.Debug("PrintMeta: query coordinator cluster info: watchedQueryChannelInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
//			}
//		}
//	}
//}
619 620 621 622 623 624 625 626 627 628

func (c *queryNodeCluster) getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo {
	c.RLock()
	defer c.RUnlock()
	if node, ok := c.nodes[nodeID]; ok {
		return node.showCollections()
	}

	return nil
}