cluster.go 15.8 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13 14 15

import (
	"context"
16
	"encoding/json"
17 18
	"errors"
	"fmt"
19 20
	"path/filepath"
	"strconv"
21 22
	"sync"

23 24
	"github.com/milvus-io/milvus/internal/proto/milvuspb"

25
	"github.com/golang/protobuf/proto"
26 27
	"go.uber.org/zap"

28
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
29 30 31
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
32 33 34 35
	"github.com/milvus-io/milvus/internal/util/sessionutil"
)

const (
36 37
	queryNodeMetaPrefix = "queryCoord-queryNodeMeta"
	queryNodeInfoPrefix = "queryCoord-queryNodeInfo"
38 39
)

40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
type Cluster interface {
	reloadFromKV() error
	getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error)

	loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error
	releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error
	getNumSegments(nodeID int64) (int, error)

	watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error
	getNumDmChannels(nodeID int64) (int, error)

	hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool
	getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo
	addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error
	removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error
	releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error
	releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error
	getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error)

	registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error
	getNodeByID(nodeID int64) (Node, error)
	removeNodeInfo(nodeID int64) error
	stopNode(nodeID int64)
	onServiceNodes() (map[int64]Node, error)
	isOnService(nodeID int64) (bool, error)

	printMeta()
}

69
type queryNodeCluster struct {
70 71
	client *etcdkv.EtcdKV

72
	sync.RWMutex
73 74
	clusterMeta Meta
	nodes       map[int64]Node
75 76
}

77 78
func newQueryNodeCluster(clusterMeta Meta, kv *etcdkv.EtcdKV) (*queryNodeCluster, error) {
	nodes := make(map[int64]Node)
79 80
	c := &queryNodeCluster{
		client:      kv,
81 82 83
		clusterMeta: clusterMeta,
		nodes:       nodes,
	}
84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
	err := c.reloadFromKV()
	if err != nil {
		return nil, err
	}

	return c, nil
}

func (c *queryNodeCluster) reloadFromKV() error {
	nodeIDs := make([]UniqueID, 0)
	keys, values, err := c.client.LoadWithPrefix(queryNodeInfoPrefix)
	if err != nil {
		return err
	}
	for index := range keys {
		nodeID, err := strconv.ParseInt(filepath.Base(keys[index]), 10, 64)
		if err != nil {
			return err
		}
103

104 105 106 107 108
		session := &sessionutil.Session{}
		err = json.Unmarshal([]byte(values[index]), session)
		if err != nil {
			return err
		}
109
		err = c.registerNode(context.Background(), session, nodeID)
110
		if err != nil {
111
			log.Debug("ReloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
112
			continue
113
		}
114
		nodeIDs = append(nodeIDs, nodeID)
115 116 117
	}
	for _, nodeID := range nodeIDs {
		infoPrefix := fmt.Sprintf("%s/%d", queryNodeMetaPrefix, nodeID)
118
		_, collectionValues, err := c.client.LoadWithPrefix(infoPrefix)
119 120 121
		if err != nil {
			return err
		}
122 123 124
		for _, value := range collectionValues {
			collectionInfo := &querypb.CollectionInfo{}
			err = proto.UnmarshalText(value, collectionInfo)
125 126 127
			if err != nil {
				return err
			}
128
			err = c.nodes[nodeID].setCollectionInfo(collectionInfo)
129
			if err != nil {
130
				log.Debug("ReloadFromKV: failed to add queryNode meta to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
131 132 133 134 135
				return err
			}
		}
	}
	return nil
136 137
}

138
func (c *queryNodeCluster) getComponentInfos(ctx context.Context) ([]*internalpb.ComponentInfo, error) {
139 140 141
	c.RLock()
	defer c.RUnlock()
	subComponentInfos := make([]*internalpb.ComponentInfo, 0)
142
	nodes, err := c.getOnServiceNodes()
143
	if err != nil {
144
		log.Debug("GetComponentInfos: failed get on service nodes", zap.String("error info", err.Error()))
145 146
		return nil, err
	}
147 148 149
	for _, node := range nodes {
		componentState := node.getComponentInfo(ctx)
		subComponentInfos = append(subComponentInfos, componentState)
150 151
	}

152
	return subComponentInfos, nil
153 154
}

155
func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *querypb.LoadSegmentsRequest) error {
156 157
	c.Lock()
	defer c.Unlock()
158

159
	if node, ok := c.nodes[nodeID]; ok {
160
		segmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
161 162
		for _, info := range in.Infos {
			segmentID := info.SegmentID
163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
			segmentInfo, err := c.clusterMeta.getSegmentInfoByID(segmentID)
			if err == nil {
				segmentInfos[segmentID] = proto.Clone(segmentInfo).(*querypb.SegmentInfo)
				if in.LoadCondition != querypb.TriggerCondition_loadBalance {
					segmentInfo.SegmentState = querypb.SegmentState_sealing
					segmentInfo.NodeID = nodeID
				}
			} else {
				segmentInfo = &querypb.SegmentInfo{
					SegmentID:    segmentID,
					CollectionID: info.CollectionID,
					PartitionID:  info.PartitionID,
					NodeID:       nodeID,
					SegmentState: querypb.SegmentState_sealing,
				}
178
			}
179
			c.clusterMeta.setSegmentInfo(segmentID, segmentInfo)
180
		}
181 182
		err := node.loadSegments(ctx, in)
		if err != nil {
183 184 185 186 187 188 189 190
			for _, info := range in.Infos {
				segmentID := info.SegmentID
				if _, ok = segmentInfos[segmentID]; ok {
					c.clusterMeta.setSegmentInfo(segmentID, segmentInfos[segmentID])
					continue
				}
				c.clusterMeta.deleteSegmentInfoByID(segmentID)
			}
191 192
			log.Debug("LoadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
			return err
193
		}
194
		return nil
195
	}
196
	return errors.New("LoadSegments: Can't find query node by nodeID ")
197 198
}

199
func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in *querypb.ReleaseSegmentsRequest) error {
200 201 202 203
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
204
		if !node.isOnService() {
205
			return errors.New("node offline")
206
		}
207 208 209 210 211

		err := node.releaseSegments(ctx, in)
		if err != nil {
			log.Debug("ReleaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
			return err
212
		}
213 214 215

		for _, segmentID := range in.SegmentIDs {
			c.clusterMeta.deleteSegmentInfoByID(segmentID)
216
		}
217
		return nil
218 219
	}

220
	return errors.New("ReleaseSegments: Can't find query node by nodeID ")
221 222
}

223
func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in *querypb.WatchDmChannelsRequest) error {
224 225
	c.Lock()
	defer c.Unlock()
226

227
	if node, ok := c.nodes[nodeID]; ok {
228 229 230 231
		err := node.watchDmChannels(ctx, in)
		if err != nil {
			log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
			return err
232
		}
233 234 235 236
		channels := make([]string, 0)
		for _, info := range in.Infos {
			channels = append(channels, info.ChannelName)
		}
237

238 239 240 241 242 243
		collectionID := in.CollectionID
		//c.clusterMeta.addCollection(collectionID, in.Schema)
		err = c.clusterMeta.addDmChannel(collectionID, nodeID, channels)
		if err != nil {
			log.Debug("WatchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
			return err
244
		}
245 246

		return nil
247
	}
248
	return errors.New("WatchDmChannels: Can't find query node by nodeID ")
249 250 251 252 253 254 255 256 257
}

func (c *queryNodeCluster) hasWatchedQueryChannel(ctx context.Context, nodeID int64, collectionID UniqueID) bool {
	c.Lock()
	defer c.Unlock()

	return c.nodes[nodeID].hasWatchedQueryChannel(collectionID)
}

258
func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in *querypb.AddQueryChannelRequest) error {
259 260 261
	c.Lock()
	defer c.Unlock()
	if node, ok := c.nodes[nodeID]; ok {
262 263 264 265
		err := node.addQueryChannel(ctx, in)
		if err != nil {
			log.Debug("AddQueryChannel: queryNode add query channel error", zap.String("error", err.Error()))
			return err
266
		}
267
		return nil
268 269
	}

270
	return errors.New("AddQueryChannel: can't find query node by nodeID")
271
}
272
func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64, in *querypb.RemoveQueryChannelRequest) error {
273 274 275 276
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
277 278 279 280
		err := node.removeQueryChannel(ctx, in)
		if err != nil {
			log.Debug("RemoveQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
			return err
281
		}
282 283

		return nil
284 285
	}

286
	return errors.New("RemoveQueryChannel: can't find query node by nodeID")
287 288
}

289
func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64, in *querypb.ReleaseCollectionRequest) error {
290 291 292 293
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
294 295 296 297
		err := node.releaseCollection(ctx, in)
		if err != nil {
			log.Debug("ReleaseCollection: queryNode release collection error", zap.String("error", err.Error()))
			return err
298
		}
299 300 301 302
		err = c.clusterMeta.releaseCollection(in.CollectionID)
		if err != nil {
			log.Debug("ReleaseCollection: meta release collection error", zap.String("error", err.Error()))
			return err
303
		}
304
		return nil
305 306
	}

307
	return errors.New("ReleaseCollection: can't find query node by nodeID")
308 309
}

310
func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64, in *querypb.ReleasePartitionsRequest) error {
311 312 313 314
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
315 316 317 318
		err := node.releasePartitions(ctx, in)
		if err != nil {
			log.Debug("ReleasePartitions: queryNode release partitions error", zap.String("error", err.Error()))
			return err
319
		}
320 321 322 323 324
		for _, partitionID := range in.PartitionIDs {
			err = c.clusterMeta.releasePartition(in.CollectionID, partitionID)
			if err != nil {
				log.Debug("ReleasePartitions: meta release partitions error", zap.String("error", err.Error()))
				return err
325 326
			}
		}
327
		return nil
328 329
	}

330
	return errors.New("ReleasePartitions: can't find query node by nodeID")
331 332 333
}

func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
334 335
	c.RLock()
	defer c.RUnlock()
336 337

	segmentInfos := make([]*querypb.SegmentInfo, 0)
338 339
	for _, node := range c.nodes {
		res, err := node.getSegmentInfo(ctx, in)
340 341 342
		if err != nil {
			return nil, err
		}
343 344 345
		if res != nil {
			segmentInfos = append(segmentInfos, res.Infos...)
		}
346 347
	}

348
	//TODO::update meta
349 350 351
	return segmentInfos, nil
}

352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
type queryNodeGetMetricsResponse struct {
	resp *milvuspb.GetMetricsResponse
	err  error
}

func (c *queryNodeCluster) getMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) []queryNodeGetMetricsResponse {
	c.RLock()
	defer c.RUnlock()

	ret := make([]queryNodeGetMetricsResponse, 0, len(c.nodes))
	for _, node := range c.nodes {
		resp, err := node.getMetrics(ctx, in)
		ret = append(ret, queryNodeGetMetricsResponse{
			resp: resp,
			err:  err,
		})
	}

	return ret
}

373
func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
374 375
	c.RLock()
	defer c.RUnlock()
376 377

	if _, ok := c.nodes[nodeID]; !ok {
378
		return 0, errors.New("GetNumDmChannels: Can't find query node by nodeID ")
379 380 381
	}

	numChannel := 0
382 383
	collectionInfos := c.clusterMeta.showCollections()
	for _, info := range collectionInfos {
384 385 386 387 388 389 390 391 392 393
		for _, channelInfo := range info.ChannelInfos {
			if channelInfo.NodeIDLoaded == nodeID {
				numChannel++
			}
		}
	}
	return numChannel, nil
}

func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
394 395
	c.RLock()
	defer c.RUnlock()
396 397

	if _, ok := c.nodes[nodeID]; !ok {
398
		return 0, errors.New("getNumSegments: Can't find query node by nodeID ")
399 400 401
	}

	numSegment := 0
402 403 404 405 406 407 408
	segmentInfos := make([]*querypb.SegmentInfo, 0)
	collectionInfos := c.clusterMeta.showCollections()
	for _, info := range collectionInfos {
		res := c.clusterMeta.showSegmentInfos(info.CollectionID, nil)
		segmentInfos = append(segmentInfos, res...)
	}
	for _, info := range segmentInfos {
409 410 411 412 413 414 415
		if info.NodeID == nodeID {
			numSegment++
		}
	}
	return numSegment, nil
}

416
func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionutil.Session, id UniqueID) error {
417 418 419
	c.Lock()
	defer c.Unlock()

420
	if _, ok := c.nodes[id]; !ok {
421 422
		sessionJSON, err := json.Marshal(session)
		if err != nil {
423
			log.Debug("RegisterNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439
			return err
		}
		key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
		err = c.client.Save(key, string(sessionJSON))
		if err != nil {
			return err
		}
		c.nodes[id] = newQueryNode(ctx, session.Address, id, c.client)
		log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))

		go func() {
			err = c.nodes[id].start()
			if err != nil {
				log.Error("RegisterNode: start queryNode client failed", zap.Int64("nodeID", id), zap.String("error", err.Error()))
				return
			}
440
			log.Debug("RegisterNode: start queryNode success, print cluster MetaReplica info", zap.Int64("nodeID", id))
441 442 443
			c.printMeta()
		}()

444 445
		return nil
	}
446
	return fmt.Errorf("RegisterNode: node %d alredy exists in cluster", id)
447
}
448

449
func (c *queryNodeCluster) getNodeByID(nodeID int64) (Node, error) {
450 451 452 453 454 455 456
	c.RLock()
	defer c.RUnlock()

	if node, ok := c.nodes[nodeID]; ok {
		return node, nil
	}

457
	return nil, fmt.Errorf("GetNodeByID: query node %d not exist", nodeID)
458 459
}

460
func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
461 462 463
	c.Lock()
	defer c.Unlock()

464
	key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, nodeID)
465 466 467 468 469
	err := c.client.Remove(key)
	if err != nil {
		return err
	}

470 471 472 473 474 475
	if _, ok := c.nodes[nodeID]; ok {
		err = c.nodes[nodeID].clearNodeInfo()
		if err != nil {
			return err
		}
		delete(c.nodes, nodeID)
476
		log.Debug("RemoveNodeInfo: delete nodeInfo in cluster MetaReplica and etcd", zap.Int64("nodeID", nodeID))
477 478 479
	}

	return nil
480 481
}

482 483 484
func (c *queryNodeCluster) stopNode(nodeID int64) {
	if node, ok := c.nodes[nodeID]; ok {
		node.stop()
485
		log.Debug("StopNode: queryNode offline", zap.Int64("nodeID", nodeID))
486 487 488
	}
}

489
func (c *queryNodeCluster) onServiceNodes() (map[int64]Node, error) {
490 491
	c.RLock()
	defer c.RUnlock()
492

493
	return c.getOnServiceNodes()
494 495
}

496 497
func (c *queryNodeCluster) getOnServiceNodes() (map[int64]Node, error) {
	nodes := make(map[int64]Node)
498 499
	for nodeID, node := range c.nodes {
		if node.isOnService() {
500
			nodes[nodeID] = node
501 502
		}
	}
503
	if len(nodes) == 0 {
504
		return nil, errors.New("GetOnServiceNodes: no queryNode is alive")
505 506
	}

507 508 509 510 511 512 513 514 515 516 517
	return nodes, nil
}

func (c *queryNodeCluster) isOnService(nodeID int64) (bool, error) {
	c.Lock()
	defer c.Unlock()

	if node, ok := c.nodes[nodeID]; ok {
		return node.isOnService(), nil
	}

518
	return false, fmt.Errorf("IsOnService: query node %d not exist", nodeID)
519 520 521
}

func (c *queryNodeCluster) printMeta() {
522 523
	c.RLock()
	defer c.RUnlock()
524

525 526
	for id, node := range c.nodes {
		if node.isOnService() {
527 528 529
			collectionInfos := node.showCollections()
			for _, info := range collectionInfos {
				log.Debug("PrintMeta: query coordinator cluster info: collectionInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
530 531
			}

532 533 534
			queryChannelInfos := node.showWatchedQueryChannels()
			for _, info := range queryChannelInfos {
				log.Debug("PrintMeta: query coordinator cluster info: watchedQueryChannelInfo", zap.Int64("nodeID", id), zap.Int64("collectionID", info.CollectionID), zap.Any("info", info))
535 536 537 538
			}
		}
	}
}
539 540 541 542 543 544 545 546 547 548

func (c *queryNodeCluster) getCollectionInfosByID(ctx context.Context, nodeID int64) []*querypb.CollectionInfo {
	c.RLock()
	defer c.RUnlock()
	if node, ok := c.nodes[nodeID]; ok {
		return node.showCollections()
	}

	return nil
}