impl.go 28.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17 18 19 20
package querynode

import (
	"context"
21
	"fmt"
22 23 24

	"go.uber.org/zap"

25
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
26
	"github.com/milvus-io/milvus/internal/log"
27
	"github.com/milvus-io/milvus/internal/metrics"
X
Xiangyu Wang 已提交
28 29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
31
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
32
	queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
33
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
34
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
35
	"github.com/milvus-io/milvus/internal/util/typeutil"
36 37
)

38
// GetComponentStates returns information about whether the node is healthy
39 40 41 42 43 44
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
45 46 47
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
48 49
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
50
			Reason:    errMsg,
51
		}
G
godchen 已提交
52
		return stats, nil
53 54 55 56
	}
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
57 58
	}
	info := &internalpb.ComponentInfo{
59
		NodeID:    nodeID,
60 61 62 63
		Role:      typeutil.QueryNodeRole,
		StateCode: code,
	}
	stats.State = info
64
	log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
65 66 67
	return stats, nil
}

68 69
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
70 71 72 73 74 75
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
76
		Value: Params.CommonCfg.QueryCoordTimeTick,
77 78 79
	}, nil
}

80
// GetStatisticsChannel returns the statistics channel
81
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
82 83 84 85 86 87
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
88
		Value: Params.CommonCfg.QueryNodeStats,
89 90 91
	}, nil
}

G
godchen 已提交
92
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
93
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
94 95
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
96
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
97 98 99 100
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
101
		return status, nil
102
	}
103 104 105 106 107 108 109
	dct := &watchDmChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
110 111
	}

112 113
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
114 115
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
116
			Reason:    err.Error(),
117
		}
X
Xiaofan 已提交
118
		log.Warn(err.Error())
G
godchen 已提交
119
		return status, nil
120
	}
X
Xiaofan 已提交
121
	log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
122
	waitFunc := func() (*commonpb.Status, error) {
123
		err = dct.WaitToFinish()
124
		if err != nil {
125 126 127 128
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
129
			log.Warn(err.Error())
G
godchen 已提交
130
			return status, nil
131
		}
X
Xiaofan 已提交
132
		log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
133 134 135
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
136
	}
137 138

	return waitFunc()
139 140
}

G
godchen 已提交
141
// WatchDeltaChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
142
func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.WatchDeltaChannelsRequest) (*commonpb.Status, error) {
143 144
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
145
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
146 147 148 149
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
150
		return status, nil
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
	}
	dct := &watchDeltaChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
167
		log.Warn(err.Error())
G
godchen 已提交
168
		return status, nil
169
	}
X
Xiaofan 已提交
170 171

	log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
172 173 174 175 176 177 178 179

	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
180
			log.Warn(err.Error())
G
godchen 已提交
181
			return status, nil
182
		}
X
Xiaofan 已提交
183 184

		log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
185 186 187 188 189 190
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
	}

	return waitFunc()
191 192
}

193
// LoadSegments load historical data into query node, historical data can be vector data or index
194
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
195 196
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
197
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
198 199 200 201
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
202
		return status, nil
203
	}
204 205 206 207 208 209 210 211 212
	dct := &loadSegmentsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

213 214 215 216
	segmentIDs := make([]UniqueID, 0, len(in.GetInfos()))
	for _, info := range in.Infos {
		segmentIDs = append(segmentIDs, info.SegmentID)
	}
217 218 219 220 221 222
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
223
		log.Warn(err.Error())
G
godchen 已提交
224
		return status, nil
225
	}
226

X
Xiaofan 已提交
227
	log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
228

229
	waitFunc := func() (*commonpb.Status, error) {
230 231
		err = dct.WaitToFinish()
		if err != nil {
232 233 234 235
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
236
			log.Warn(err.Error())
G
godchen 已提交
237
			return status, nil
238
		}
X
Xiaofan 已提交
239
		log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
240 241 242
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
243
	}
244 245

	return waitFunc()
246 247
}

G
godchen 已提交
248
// ReleaseCollection clears all data related to this collection on the querynode
249
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
250 251
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
252
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
253 254 255 256
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
257
		return status, nil
258
	}
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
	dct := &releaseCollectionTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
274
		log.Warn(err.Error())
G
godchen 已提交
275
		return status, nil
276
	}
X
Xiaofan 已提交
277
	log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
278

279
	func() {
280 281
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
282
			log.Warn(err.Error())
283
			return
284
		}
X
Xiaofan 已提交
285
		log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
286
	}()
287 288 289 290 291 292 293

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

294
// ReleasePartitions clears all data related to this partition on the querynode
295
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
296 297
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
298
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
299 300 301 302
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
303
		return status, nil
304
	}
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
	dct := &releasePartitionsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
320
		log.Warn(err.Error())
G
godchen 已提交
321
		return status, nil
322
	}
X
Xiaofan 已提交
323
	log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
324

325
	func() {
326 327
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
328
			log.Warn(err.Error())
329
			return
330
		}
X
Xiaofan 已提交
331
		log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
332
	}()
333 334 335 336 337 338 339

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

340
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
341
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
342 343
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
344
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
345 346 347 348
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
349
		return status, nil
350
	}
351

352 353 354 355 356 357 358 359 360 361 362
	collection, err := node.metaReplica.getCollectionByID(in.CollectionID)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    fmt.Sprintf("cannot find collection %d when ReleaseSegments", in.CollectionID),
		}
		return status, nil
	}

	collection.Lock()
	defer collection.Unlock()
363
	for _, id := range in.SegmentIDs {
364 365 366 367 368 369 370 371 372
		switch in.GetScope() {
		case queryPb.DataScope_Streaming:
			node.metaReplica.removeSegment(id, segmentTypeGrowing)
		case queryPb.DataScope_Historical:
			node.metaReplica.removeSegment(id, segmentTypeSealed)
		case queryPb.DataScope_All:
			node.metaReplica.removeSegment(id, segmentTypeSealed)
			node.metaReplica.removeSegment(id, segmentTypeGrowing)
		}
373
	}
X
xige-16 已提交
374

375
	log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs), zap.String("Scope", in.GetScope().String()))
376 377 378
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
379 380
}

381
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
382
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
383 384
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
385
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
386 387 388 389 390 391
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
392
		return res, nil
393
	}
394 395 396 397 398 399
	var segmentInfos []*queryPb.SegmentInfo

	segmentIDs := make(map[int64]struct{})
	for _, segmentID := range in.GetSegmentIDs() {
		segmentIDs[segmentID] = struct{}{}
	}
400

401 402
	infos := node.metaReplica.getSegmentInfosByColID(in.CollectionID)
	segmentInfos = append(segmentInfos, filterSegmentInfo(infos, segmentIDs)...)
403

404 405 406 407
	return &queryPb.GetSegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
408
		Infos: segmentInfos,
409 410
	}, nil
}
411

412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
// filterSegmentInfo returns segment info which segment id in segmentIDs map
func filterSegmentInfo(segmentInfos []*queryPb.SegmentInfo, segmentIDs map[int64]struct{}) []*queryPb.SegmentInfo {
	if len(segmentIDs) == 0 {
		return segmentInfos
	}
	filtered := make([]*queryPb.SegmentInfo, 0, len(segmentIDs))
	for _, info := range segmentInfos {
		_, ok := segmentIDs[info.GetSegmentID()]
		if !ok {
			continue
		}
		filtered = append(filtered, info)
	}
	return filtered
}

428
// isHealthy checks if QueryNode is healthy
429 430 431 432 433
func (node *QueryNode) isHealthy() bool {
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

434
// Search performs replica search tasks.
435
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*internalpb.SearchResults, error) {
436
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel).Inc()
437 438 439 440 441
	failRet := &internalpb.SearchResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
		},
	}
442 443 444 445 446 447

	defer func() {
		if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
			metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.FailLabel).Inc()
		}
	}()
448
	if !node.isHealthy() {
449 450
		failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
		return failRet, nil
451 452
	}

453
	msgID := req.GetReq().GetBase().GetMsgID()
454
	log.Debug("Received SearchRequest",
455 456
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
457 458 459 460
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()),
		zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
		zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
461 462

	if node.queryShardService == nil {
463 464
		failRet.Status.Reason = "queryShardService is nil"
		return failRet, nil
465 466 467 468
	}

	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
469 470 471 472
		log.Warn("Search failed, failed to get query shard",
			zap.Int64("msgID", msgID),
			zap.String("dml channel", req.GetDmlChannel()),
			zap.Error(err))
473 474 475
		failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
		failRet.Status.Reason = err.Error()
		return failRet, nil
476 477
	}

478 479 480 481 482 483
	log.Debug("start do search",
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()))
	tr := timerecord.NewTimeRecorder("")
484

485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503
	if req.FromShardLeader {
		historicalTask, err2 := newSearchTask(ctx, req)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
		historicalTask.QS = qs
		historicalTask.DataScope = querypb.DataScope_Historical
		err2 = node.scheduler.AddReadTask(ctx, historicalTask)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}

		err2 = historicalTask.WaitToFinish()
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
504

505 506 507
		tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
			msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

508 509 510 511 512 513 514 515
		failRet.Status.ErrorCode = commonpb.ErrorCode_Success
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(historicalTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(historicalTask.reduceDur.Milliseconds()))
		latency := tr.ElapseSpan()
		metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
		metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
516 517 518 519 520 521
		return historicalTask.Ret, nil
	}

	//from Proxy
	cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
	if !ok {
522
		failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
523 524 525 526 527 528 529 530 531 532 533
		failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
		return failRet, nil
	}

	searchCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	var results []*internalpb.SearchResults
	var streamingResult *internalpb.SearchResults
	var errCluster error

534 535 536 537
	withStreaming := func(ctx context.Context) error {
		streamingTask, err := newSearchTask(searchCtx, req)
		if err != nil {
			return err
538 539 540
		}
		streamingTask.QS = qs
		streamingTask.DataScope = querypb.DataScope_Streaming
541 542 543
		err = node.scheduler.AddReadTask(searchCtx, streamingTask)
		if err != nil {
			return err
544
		}
545 546 547
		err = streamingTask.WaitToFinish()
		if err != nil {
			return err
548
		}
549 550 551 552
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
553
		streamingResult = streamingTask.Ret
554
		return nil
555 556
	}

557 558 559 560 561
	// shard leader dispatches request to its shard cluster
	results, errCluster = cluster.Search(searchCtx, req, withStreaming)
	if errCluster != nil {
		log.Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
		failRet.Status.Reason = errCluster.Error()
562 563
		return failRet, nil
	}
564 565 566 567

	tr.Elapse(fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

568 569 570 571 572 573
	results = append(results, streamingResult)
	ret, err2 := reduceSearchResults(results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
	if err2 != nil {
		failRet.Status.Reason = err2.Error()
		return failRet, nil
	}
574

575 576 577
	tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

578 579 580 581
	failRet.Status.ErrorCode = commonpb.ErrorCode_Success
	latency := tr.ElapseSpan()
	metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
582 583
	metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetNq()))
	metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetTopk()))
584
	return ret, nil
585 586 587
}

// Query performs replica query tasks.
588
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*internalpb.RetrieveResults, error) {
589
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel).Inc()
590 591 592 593 594
	failRet := &internalpb.RetrieveResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
		},
	}
595 596 597 598 599 600

	defer func() {
		if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
			metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
		}
	}()
601
	if !node.isHealthy() {
602 603
		failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
		return failRet, nil
604
	}
605

606
	msgID := req.GetReq().GetBase().GetMsgID()
607
	log.Debug("Received QueryRequest",
608 609
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
610 611 612 613
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()),
		zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
		zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
614 615

	if node.queryShardService == nil {
616 617 618 619
		failRet.Status.Reason = "queryShardService is nil"
		return failRet, nil
	}

620 621
	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
622
		log.Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
623 624 625 626
		failRet.Status.Reason = err.Error()
		return failRet, nil
	}

627 628 629 630 631 632
	log.Debug("start do query",
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()))
	tr := timerecord.NewTimeRecorder("")
633

634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
	if req.FromShardLeader {
		// construct a queryTask
		queryTask := newQueryTask(ctx, req)
		queryTask.QS = qs
		queryTask.DataScope = querypb.DataScope_Historical
		err2 := node.scheduler.AddReadTask(ctx, queryTask)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}

		err2 = queryTask.WaitToFinish()
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
650 651 652 653

		tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
			msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

654 655 656 657 658 659 660 661
		failRet.Status.ErrorCode = commonpb.ErrorCode_Success
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(queryTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(queryTask.reduceDur.Milliseconds()))
		latency := tr.ElapseSpan()
		metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
		metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
662
		return queryTask.Ret, nil
663 664
	}

665 666
	cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
	if !ok {
667
		failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
668 669 670 671 672 673 674 675 676 677 678
		failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
		return failRet, nil
	}

	// add cancel when error occurs
	queryCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	var results []*internalpb.RetrieveResults
	var streamingResult *internalpb.RetrieveResults

679
	withStreaming := func(ctx context.Context) error {
680 681 682
		streamingTask := newQueryTask(queryCtx, req)
		streamingTask.DataScope = querypb.DataScope_Streaming
		streamingTask.QS = qs
683 684 685 686
		err := node.scheduler.AddReadTask(queryCtx, streamingTask)

		if err != nil {
			return err
687
		}
688 689 690
		err = streamingTask.WaitToFinish()
		if err != nil {
			return err
691
		}
692 693 694 695
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
696
		streamingResult = streamingTask.Ret
697
		return nil
698 699
	}

700 701 702 703 704 705
	var errCluster error
	// shard leader dispatches request to its shard cluster
	results, errCluster = cluster.Query(queryCtx, req, withStreaming)
	if errCluster != nil {
		log.Warn("failed to query cluster", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
		failRet.Status.Reason = errCluster.Error()
706 707
		return failRet, nil
	}
708 709 710 711

	tr.Elapse(fmt.Sprintf("start reduce query result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

712 713 714 715 716 717
	results = append(results, streamingResult)
	ret, err2 := mergeInternalRetrieveResults(results)
	if err2 != nil {
		failRet.Status.Reason = err2.Error()
		return failRet, nil
	}
718 719 720 721

	tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

722 723 724 725
	failRet.Status.ErrorCode = commonpb.ErrorCode_Success
	latency := tr.ElapseSpan()
	metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
726
	return ret, nil
727 728
}

729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
// SyncReplicaSegments syncs replica node & segments states
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
	if !node.isHealthy() {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
		}, nil
	}

	log.Debug("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName()))

	err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments())
	if err != nil {
		log.Warn("failed to sync replica semgents,", zap.String("vchannel", req.GetVchannelName()), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName()))

	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}

G
godchen 已提交
754
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
755
// TODO(dragondriver): cache the Metrics and set a retention to the cache
756 757 758
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if !node.isHealthy() {
		log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
759
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
760
			zap.String("req", req.Request),
X
Xiaofan 已提交
761
			zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
762 763 764 765

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
766
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
767 768 769 770 771 772 773 774
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryNode.GetMetrics failed to parse metric type",
X
Xiaofan 已提交
775
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
776 777 778 779 780 781 782 783 784 785 786 787 788 789
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	if metricType == metricsinfo.SystemInfoMetrics {
		metrics, err := getSystemInfoMetrics(ctx, req, node)
X
Xiaofan 已提交
790 791
		if err != nil {
			log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
792
				zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
X
Xiaofan 已提交
793 794 795 796
				zap.String("req", req.Request),
				zap.String("metric_type", metricType),
				zap.Error(err))
		}
797

G
godchen 已提交
798
		return metrics, nil
799 800 801
	}

	log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
X
Xiaofan 已提交
802
		zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
803 804 805 806 807 808 809 810 811 812 813
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}