impl.go 23.8 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17 18 19 20
package querynode

import (
	"context"
21
	"fmt"
22 23 24

	"go.uber.org/zap"

25
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
26 27 28 29
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
30
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
31
	queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
32
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/util/typeutil"
34 35
)

36
// GetComponentStates returns information about whether the node is healthy
37 38 39 40 41 42
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
43 44 45
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
46 47
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
48
			Reason:    errMsg,
49
		}
G
godchen 已提交
50
		return stats, nil
51 52 53 54
	}
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
55 56
	}
	info := &internalpb.ComponentInfo{
57
		NodeID:    nodeID,
58 59 60 61
		Role:      typeutil.QueryNodeRole,
		StateCode: code,
	}
	stats.State = info
62
	log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
63 64 65
	return stats, nil
}

66 67
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
68 69 70 71 72 73
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
74
		Value: Params.CommonCfg.QueryCoordTimeTick,
75 76 77
	}, nil
}

78
// GetStatisticsChannel returns the statistics channel
79
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
80 81 82 83 84 85
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
86
		Value: Params.CommonCfg.QueryNodeStats,
87 88 89
	}, nil
}

90
// AddQueryChannel watch queryChannel of the collection to receive query message
91
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
92 93
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
94
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
95 96 97 98
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
99
		return status, nil
100
	}
101 102 103 104 105 106 107
	dct := &addQueryChannelTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
108
	}
109

110
	err := node.scheduler.queue.Enqueue(dct)
111 112 113 114
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
115
		}
X
Xiaofan 已提交
116
		log.Warn(err.Error())
G
godchen 已提交
117
		return status, nil
118
	}
X
Xiaofan 已提交
119
	log.Info("addQueryChannelTask Enqueue done",
120 121 122 123
		zap.Int64("collectionID", in.CollectionID),
		zap.String("queryChannel", in.QueryChannel),
		zap.String("queryResultChannel", in.QueryResultChannel),
	)
124

125 126 127 128 129 130
	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
131
			}
X
Xiaofan 已提交
132
			log.Warn(err.Error())
G
godchen 已提交
133
			return status, nil
134
		}
X
Xiaofan 已提交
135
		log.Info("addQueryChannelTask WaitToFinish done",
136 137 138 139 140
			zap.Int64("collectionID", in.CollectionID),
			zap.String("queryChannel", in.QueryChannel),
			zap.String("queryResultChannel", in.QueryResultChannel),
		)

141 142 143
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
144
	}
145

146
	return waitFunc()
147 148
}

149
// RemoveQueryChannel remove queryChannel of the collection to stop receiving query message
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
	// if node.searchService == nil || node.searchService.searchMsgStream == nil {
	// 	errMsg := "null search service or null search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// // remove request channel
	// consumeChannels := []string{in.RequestChannelID}
	// consumeSubName := Params.MsgChannelSubName
	// // TODO: searchStream.RemovePulsarConsumers(producerChannels)
	// searchStream.AsConsumer(consumeChannels, consumeSubName)

	// // remove result channel
	// producerChannels := []string{in.ResultChannelID}
	// // TODO: resultStream.RemovePulsarProducer(producerChannels)
	// resultStream.AsProducer(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

G
godchen 已提交
200
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
201
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
202 203
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
204
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
205 206 207 208
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
209
		return status, nil
210
	}
211 212 213 214 215 216 217
	dct := &watchDmChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
218 219
	}

220 221
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
222 223
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
224
			Reason:    err.Error(),
225
		}
X
Xiaofan 已提交
226
		log.Warn(err.Error())
G
godchen 已提交
227
		return status, nil
228
	}
X
Xiaofan 已提交
229
	log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
230
	waitFunc := func() (*commonpb.Status, error) {
231
		err = dct.WaitToFinish()
232
		if err != nil {
233 234 235 236
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
237
			log.Warn(err.Error())
G
godchen 已提交
238
			return status, nil
239
		}
X
Xiaofan 已提交
240
		log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
241 242 243
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
244
	}
245 246

	return waitFunc()
247 248
}

G
godchen 已提交
249
// WatchDeltaChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
250
func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.WatchDeltaChannelsRequest) (*commonpb.Status, error) {
251 252
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
253
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
254 255 256 257
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
258
		return status, nil
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274
	}
	dct := &watchDeltaChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
275
		log.Warn(err.Error())
G
godchen 已提交
276
		return status, nil
277
	}
X
Xiaofan 已提交
278 279

	log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
280 281 282 283 284 285 286 287

	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
288
			log.Warn(err.Error())
G
godchen 已提交
289
			return status, nil
290
		}
X
Xiaofan 已提交
291 292

		log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
293 294 295 296 297 298
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
	}

	return waitFunc()
299 300
}

301
// LoadSegments load historical data into query node, historical data can be vector data or index
302
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
303 304
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
305
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
306 307 308 309
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
310
		return status, nil
311
	}
312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
	dct := &loadSegmentsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
327
		log.Warn(err.Error())
G
godchen 已提交
328
		return status, nil
329
	}
330 331 332 333
	segmentIDs := make([]UniqueID, 0)
	for _, info := range in.Infos {
		segmentIDs = append(segmentIDs, info.SegmentID)
	}
X
Xiaofan 已提交
334
	log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
335

336
	waitFunc := func() (*commonpb.Status, error) {
337 338
		err = dct.WaitToFinish()
		if err != nil {
339 340 341 342
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
343
			log.Warn(err.Error())
G
godchen 已提交
344
			return status, nil
345
		}
X
Xiaofan 已提交
346
		log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
347 348 349
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
350
	}
351 352

	return waitFunc()
353 354
}

G
godchen 已提交
355
// ReleaseCollection clears all data related to this collection on the querynode
356
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
357 358
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
359
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
360 361 362 363
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
364
		return status, nil
365
	}
366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
	dct := &releaseCollectionTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
381
		log.Warn(err.Error())
G
godchen 已提交
382
		return status, nil
383
	}
X
Xiaofan 已提交
384
	log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
385

386
	func() {
387 388
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
389
			log.Warn(err.Error())
390
			return
391
		}
X
Xiaofan 已提交
392
		log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
393
	}()
394 395 396 397 398 399 400

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

401
// ReleasePartitions clears all data related to this partition on the querynode
402
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
403 404
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
405
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
406 407 408 409
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
410
		return status, nil
411
	}
412 413 414 415 416 417 418 419 420 421 422 423 424 425 426
	dct := &releasePartitionsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
427
		log.Warn(err.Error())
G
godchen 已提交
428
		return status, nil
429
	}
X
Xiaofan 已提交
430
	log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
431

432
	func() {
433 434
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
435
			log.Warn(err.Error())
436
			return
437
		}
X
Xiaofan 已提交
438
		log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
439
	}()
440 441 442 443 444 445 446

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

447
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
448
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
449 450
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
451
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
452 453 454 455
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
456
		return status, nil
457
	}
458 459 460
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
461
	// collection lock is not needed since we guarantee not query/search will be dispatch from leader
462
	for _, id := range in.SegmentIDs {
463 464 465 466 467 468 469 470
		err := node.historical.replica.removeSegment(id)
		if err != nil {
			// not return, try to release all segments
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
		}
		err = node.streaming.replica.removeSegment(id)
		if err != nil {
471 472
			// not return, try to release all segments
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
473
			status.Reason = err.Error()
474 475
		}
	}
X
xige-16 已提交
476

X
Xiaofan 已提交
477
	log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
478 479 480
	return status, nil
}

481
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
482
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
483 484
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
485
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
486 487 488 489 490 491
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
492
		return res, nil
493
	}
494 495 496 497 498 499
	var segmentInfos []*queryPb.SegmentInfo

	segmentIDs := make(map[int64]struct{})
	for _, segmentID := range in.GetSegmentIDs() {
		segmentIDs[segmentID] = struct{}{}
	}
500

501
	// get info from historical
502
	historicalSegmentInfos, err := node.historical.replica.getSegmentInfosByColID(in.CollectionID)
503
	if err != nil {
X
Xiaofan 已提交
504
		log.Warn("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
505 506 507 508 509 510
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
511
		return res, nil
512
	}
513
	segmentInfos = append(segmentInfos, filterSegmentInfo(historicalSegmentInfos, segmentIDs)...)
514

515
	// get info from streaming
516
	streamingSegmentInfos, err := node.streaming.replica.getSegmentInfosByColID(in.CollectionID)
517
	if err != nil {
X
Xiaofan 已提交
518
		log.Warn("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
519 520 521 522 523 524
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
525
		return res, nil
526
	}
527
	segmentInfos = append(segmentInfos, filterSegmentInfo(streamingSegmentInfos, segmentIDs)...)
528

529 530 531 532
	return &queryPb.GetSegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
533
		Infos: segmentInfos,
534 535
	}, nil
}
536

537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552
// filterSegmentInfo returns segment info which segment id in segmentIDs map
func filterSegmentInfo(segmentInfos []*queryPb.SegmentInfo, segmentIDs map[int64]struct{}) []*queryPb.SegmentInfo {
	if len(segmentIDs) == 0 {
		return segmentInfos
	}
	filtered := make([]*queryPb.SegmentInfo, 0, len(segmentIDs))
	for _, info := range segmentInfos {
		_, ok := segmentIDs[info.GetSegmentID()]
		if !ok {
			continue
		}
		filtered = append(filtered, info)
	}
	return filtered
}

553
// isHealthy checks if QueryNode is healthy
554 555 556 557 558
func (node *QueryNode) isHealthy() bool {
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

559
// Search performs replica search tasks.
560
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*internalpb.SearchResults, error) {
561 562 563 564
	if !node.isHealthy() {
		return &internalpb.SearchResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
565
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582
			},
		}, nil
	}

	log.Debug("Received SearchRequest", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))

	if node.queryShardService == nil {
		return &internalpb.SearchResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    "queryShardService is nil",
			},
		}, nil
	}

	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
583
		log.Warn("Search failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
584 585
		return &internalpb.SearchResults{
			Status: &commonpb.Status{
586 587
				// NotShardLeader will make proxy refresh the shard leader cache
				ErrorCode: commonpb.ErrorCode_NotShardLeader,
588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
				Reason:    err.Error(),
			},
		}, nil
	}

	results, err := qs.search(ctx, req)
	if err != nil {
		log.Warn("QueryService failed to search", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Error(err))
		return &internalpb.SearchResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	log.Debug("Search Shard Done", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))

	return results, err
606 607 608
}

// Query performs replica query tasks.
609
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*internalpb.RetrieveResults, error) {
610 611 612 613
	if !node.isHealthy() {
		return &internalpb.RetrieveResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
614
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
			},
		}, nil
	}
	log.Debug("Received QueryRequest", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))

	if node.queryShardService == nil {
		return &internalpb.RetrieveResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    "queryShardService is nil",
			},
		}, nil
	}

	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
631
		log.Warn("Query failed, failed to get query shard", zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652
		return &internalpb.RetrieveResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}

	results, err := qs.query(ctx, req)
	if err != nil {
		log.Warn("QueryService failed to query", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()), zap.Error(err))
		return &internalpb.RetrieveResults{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}, nil
	}
	log.Debug("Query Shard Done", zap.String("vchannel", req.GetDmlChannel()), zap.Int64s("segmentIDs", req.GetSegmentIDs()))

	return results, nil
653 654
}

655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679
// SyncReplicaSegments syncs replica node & segments states
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
	if !node.isHealthy() {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
		}, nil
	}

	log.Debug("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName()))

	err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments())
	if err != nil {
		log.Warn("failed to sync replica semgents,", zap.String("vchannel", req.GetVchannelName()), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName()))

	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}

G
godchen 已提交
680
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
681
// TODO(dragondriver): cache the Metrics and set a retention to the cache
682 683 684
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if !node.isHealthy() {
		log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
685
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
686
			zap.String("req", req.Request),
X
Xiaofan 已提交
687
			zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
688 689 690 691

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
692
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
693 694 695 696 697 698 699 700
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryNode.GetMetrics failed to parse metric type",
X
Xiaofan 已提交
701
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
702 703 704 705 706 707 708 709 710 711 712 713 714 715
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	if metricType == metricsinfo.SystemInfoMetrics {
		metrics, err := getSystemInfoMetrics(ctx, req, node)
X
Xiaofan 已提交
716 717
		if err != nil {
			log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
718
				zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
X
Xiaofan 已提交
719 720 721 722
				zap.String("req", req.Request),
				zap.String("metric_type", metricType),
				zap.Error(err))
		}
723

G
godchen 已提交
724
		return metrics, nil
725 726 727
	}

	log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
X
Xiaofan 已提交
728
		zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
729 730 731 732 733 734 735 736 737 738 739
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}