impl.go 28.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17 18 19 20
package querynode

import (
	"context"
21
	"fmt"
22 23 24

	"go.uber.org/zap"

25
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
26
	"github.com/milvus-io/milvus/internal/log"
27
	"github.com/milvus-io/milvus/internal/metrics"
X
Xiangyu Wang 已提交
28 29 30
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
31
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
32
	queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
33
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
34
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
35
	"github.com/milvus-io/milvus/internal/util/typeutil"
36 37
)

38
// GetComponentStates returns information about whether the node is healthy
39 40 41 42 43 44
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
45 46 47
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
48 49
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
50
			Reason:    errMsg,
51
		}
G
godchen 已提交
52
		return stats, nil
53 54 55 56
	}
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
57 58
	}
	info := &internalpb.ComponentInfo{
59
		NodeID:    nodeID,
60 61 62 63
		Role:      typeutil.QueryNodeRole,
		StateCode: code,
	}
	stats.State = info
64
	log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
65 66 67
	return stats, nil
}

68 69
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
70 71 72 73 74 75
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
76
		Value: Params.CommonCfg.QueryCoordTimeTick,
77 78 79
	}, nil
}

80
// GetStatisticsChannel returns the statistics channel
81
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
82 83 84 85 86 87
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
88
		Value: Params.CommonCfg.QueryNodeStats,
89 90 91
	}, nil
}

G
godchen 已提交
92
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
93
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
94 95
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
96
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
97 98 99 100
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
101
		return status, nil
102
	}
103 104 105 106 107 108 109
	dct := &watchDmChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
110 111
	}

112 113
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
114 115
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
116
			Reason:    err.Error(),
117
		}
X
Xiaofan 已提交
118
		log.Warn(err.Error())
G
godchen 已提交
119
		return status, nil
120
	}
X
Xiaofan 已提交
121
	log.Info("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()), zap.Int64("replicaID", in.GetReplicaID()))
122
	waitFunc := func() (*commonpb.Status, error) {
123
		err = dct.WaitToFinish()
124
		if err != nil {
125 126 127 128
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
129
			log.Warn(err.Error())
G
godchen 已提交
130
			return status, nil
131
		}
X
Xiaofan 已提交
132
		log.Info("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
133 134 135
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
136
	}
137 138

	return waitFunc()
139 140
}

G
godchen 已提交
141
// WatchDeltaChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
142
func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.WatchDeltaChannelsRequest) (*commonpb.Status, error) {
143 144
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
145
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
146 147 148 149
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
150
		return status, nil
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166
	}
	dct := &watchDeltaChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
167
		log.Warn(err.Error())
G
godchen 已提交
168
		return status, nil
169
	}
X
Xiaofan 已提交
170 171

	log.Info("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
172 173 174 175 176 177 178 179

	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
180
			log.Warn(err.Error())
G
godchen 已提交
181
			return status, nil
182
		}
X
Xiaofan 已提交
183 184

		log.Info("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
185 186 187 188 189 190
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
	}

	return waitFunc()
191 192
}

193
// LoadSegments load historical data into query node, historical data can be vector data or index
194
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
195 196
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
197
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
198 199 200 201
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
202
		return status, nil
203
	}
204 205 206 207 208 209 210 211 212
	dct := &loadSegmentsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

213 214 215 216
	segmentIDs := make([]UniqueID, 0, len(in.GetInfos()))
	for _, info := range in.Infos {
		segmentIDs = append(segmentIDs, info.SegmentID)
	}
217 218 219 220 221 222
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
223
		log.Warn(err.Error())
G
godchen 已提交
224
		return status, nil
225
	}
226

X
Xiaofan 已提交
227
	log.Info("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
228

229
	waitFunc := func() (*commonpb.Status, error) {
230 231
		err = dct.WaitToFinish()
		if err != nil {
232 233 234 235
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
X
Xiaofan 已提交
236
			log.Warn(err.Error())
G
godchen 已提交
237
			return status, nil
238
		}
X
Xiaofan 已提交
239
		log.Info("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.GetNodeID()))
240 241 242
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
243
	}
244 245

	return waitFunc()
246 247
}

G
godchen 已提交
248
// ReleaseCollection clears all data related to this collection on the querynode
249
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
250 251
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
252
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
253 254 255 256
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
257
		return status, nil
258
	}
259 260 261 262 263 264 265 266 267 268 269 270 271 272 273
	dct := &releaseCollectionTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
274
		log.Warn(err.Error())
G
godchen 已提交
275
		return status, nil
276
	}
X
Xiaofan 已提交
277
	log.Info("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
278

279
	func() {
280 281
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
282
			log.Warn(err.Error())
283
			return
284
		}
X
Xiaofan 已提交
285
		log.Info("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
286
	}()
287 288 289 290 291 292 293

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

294
// ReleasePartitions clears all data related to this partition on the querynode
295
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
296 297
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
298
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
299 300 301 302
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
303
		return status, nil
304
	}
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319
	dct := &releasePartitionsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
X
Xiaofan 已提交
320
		log.Warn(err.Error())
G
godchen 已提交
321
		return status, nil
322
	}
X
Xiaofan 已提交
323
	log.Info("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
324

325
	func() {
326 327
		err = dct.WaitToFinish()
		if err != nil {
X
Xiaofan 已提交
328
			log.Warn(err.Error())
329
			return
330
		}
X
Xiaofan 已提交
331
		log.Info("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
332
	}()
333 334 335 336 337 338 339

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

340
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
341
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
342 343
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
344
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
345 346 347 348
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
349
		return status, nil
350
	}
351

352
	// collection lock is not needed since we guarantee not query/search will be dispatch from leader
353
	for _, id := range in.SegmentIDs {
354 355 356 357 358 359 360 361 362
		switch in.GetScope() {
		case queryPb.DataScope_Streaming:
			node.metaReplica.removeSegment(id, segmentTypeGrowing)
		case queryPb.DataScope_Historical:
			node.metaReplica.removeSegment(id, segmentTypeSealed)
		case queryPb.DataScope_All:
			node.metaReplica.removeSegment(id, segmentTypeSealed)
			node.metaReplica.removeSegment(id, segmentTypeGrowing)
		}
363
	}
X
xige-16 已提交
364

365
	log.Info("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs), zap.String("Scope", in.GetScope().String()))
366 367 368
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}, nil
369 370
}

371
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
372
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
373 374
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
X
Xiaofan 已提交
375
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.GetNodeID())
376 377 378 379 380 381
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
382
		return res, nil
383
	}
384 385 386 387 388 389
	var segmentInfos []*queryPb.SegmentInfo

	segmentIDs := make(map[int64]struct{})
	for _, segmentID := range in.GetSegmentIDs() {
		segmentIDs[segmentID] = struct{}{}
	}
390

391 392
	infos := node.metaReplica.getSegmentInfosByColID(in.CollectionID)
	segmentInfos = append(segmentInfos, filterSegmentInfo(infos, segmentIDs)...)
393

394 395 396 397
	return &queryPb.GetSegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
398
		Infos: segmentInfos,
399 400
	}, nil
}
401

402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417
// filterSegmentInfo returns segment info which segment id in segmentIDs map
func filterSegmentInfo(segmentInfos []*queryPb.SegmentInfo, segmentIDs map[int64]struct{}) []*queryPb.SegmentInfo {
	if len(segmentIDs) == 0 {
		return segmentInfos
	}
	filtered := make([]*queryPb.SegmentInfo, 0, len(segmentIDs))
	for _, info := range segmentInfos {
		_, ok := segmentIDs[info.GetSegmentID()]
		if !ok {
			continue
		}
		filtered = append(filtered, info)
	}
	return filtered
}

418
// isHealthy checks if QueryNode is healthy
419 420 421 422 423
func (node *QueryNode) isHealthy() bool {
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

424
// Search performs replica search tasks.
425
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*internalpb.SearchResults, error) {
426
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.TotalLabel).Inc()
427 428 429 430 431
	failRet := &internalpb.SearchResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
		},
	}
432 433 434 435 436 437

	defer func() {
		if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
			metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.FailLabel).Inc()
		}
	}()
438
	if !node.isHealthy() {
439 440
		failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
		return failRet, nil
441 442
	}

443
	msgID := req.GetReq().GetBase().GetMsgID()
444
	log.Debug("Received SearchRequest",
445 446
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
447 448 449 450
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()),
		zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
		zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
451 452

	if node.queryShardService == nil {
453 454
		failRet.Status.Reason = "queryShardService is nil"
		return failRet, nil
455 456 457 458
	}

	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
459 460 461 462
		log.Warn("Search failed, failed to get query shard",
			zap.Int64("msgID", msgID),
			zap.String("dml channel", req.GetDmlChannel()),
			zap.Error(err))
463 464 465
		failRet.Status.ErrorCode = commonpb.ErrorCode_NotShardLeader
		failRet.Status.Reason = err.Error()
		return failRet, nil
466 467
	}

468 469 470 471 472 473
	log.Debug("start do search",
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()))
	tr := timerecord.NewTimeRecorder("")
474

475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493
	if req.FromShardLeader {
		historicalTask, err2 := newSearchTask(ctx, req)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
		historicalTask.QS = qs
		historicalTask.DataScope = querypb.DataScope_Historical
		err2 = node.scheduler.AddReadTask(ctx, historicalTask)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}

		err2 = historicalTask.WaitToFinish()
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
494

495 496 497
		tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
			msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

498 499 500 501 502 503 504 505
		failRet.Status.ErrorCode = commonpb.ErrorCode_Success
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(historicalTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(historicalTask.reduceDur.Milliseconds()))
		latency := tr.ElapseSpan()
		metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
		metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522
		return historicalTask.Ret, nil
	}

	//from Proxy
	cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
	if !ok {
		failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
		return failRet, nil
	}

	searchCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	var results []*internalpb.SearchResults
	var streamingResult *internalpb.SearchResults
	var errCluster error

523 524 525 526
	withStreaming := func(ctx context.Context) error {
		streamingTask, err := newSearchTask(searchCtx, req)
		if err != nil {
			return err
527 528 529
		}
		streamingTask.QS = qs
		streamingTask.DataScope = querypb.DataScope_Streaming
530 531 532
		err = node.scheduler.AddReadTask(searchCtx, streamingTask)
		if err != nil {
			return err
533
		}
534 535 536
		err = streamingTask.WaitToFinish()
		if err != nil {
			return err
537
		}
538 539 540 541
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.SearchLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
542
		streamingResult = streamingTask.Ret
543
		return nil
544 545
	}

546 547 548 549 550
	// shard leader dispatches request to its shard cluster
	results, errCluster = cluster.Search(searchCtx, req, withStreaming)
	if errCluster != nil {
		log.Warn("search cluster failed", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
		failRet.Status.Reason = errCluster.Error()
551 552
		return failRet, nil
	}
553 554 555 556

	tr.Elapse(fmt.Sprintf("start reduce search result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

557 558 559 560 561 562
	results = append(results, streamingResult)
	ret, err2 := reduceSearchResults(results, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
	if err2 != nil {
		failRet.Status.Reason = err2.Error()
		return failRet, nil
	}
563

564 565 566
	tr.Elapse(fmt.Sprintf("do search done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

567 568 569 570
	failRet.Status.ErrorCode = commonpb.ErrorCode_Success
	latency := tr.ElapseSpan()
	metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel).Observe(float64(latency.Milliseconds()))
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.SuccessLabel).Inc()
571 572
	metrics.QueryNodeSearchNQ.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetNq()))
	metrics.QueryNodeSearchTopK.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Observe(float64(req.Req.GetTopk()))
573
	return ret, nil
574 575 576
}

// Query performs replica query tasks.
577
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*internalpb.RetrieveResults, error) {
578
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel).Inc()
579 580 581 582 583
	failRet := &internalpb.RetrieveResults{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
		},
	}
584 585 586 587 588 589

	defer func() {
		if failRet.Status.ErrorCode != commonpb.ErrorCode_Success {
			metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.SearchLabel, metrics.FailLabel).Inc()
		}
	}()
590
	if !node.isHealthy() {
591 592
		failRet.Status.Reason = msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())
		return failRet, nil
593
	}
594

595
	msgID := req.GetReq().GetBase().GetMsgID()
596
	log.Debug("Received QueryRequest",
597 598
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
599 600 601 602
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()),
		zap.Uint64("guaranteeTimestamp", req.GetReq().GetGuaranteeTimestamp()),
		zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
603 604

	if node.queryShardService == nil {
605 606 607 608
		failRet.Status.Reason = "queryShardService is nil"
		return failRet, nil
	}

609 610
	qs, err := node.queryShardService.getQueryShard(req.GetDmlChannel())
	if err != nil {
611
		log.Warn("Query failed, failed to get query shard", zap.Int64("msgID", msgID), zap.String("dml channel", req.GetDmlChannel()), zap.Error(err))
612 613 614 615
		failRet.Status.Reason = err.Error()
		return failRet, nil
	}

616 617 618 619 620 621
	log.Debug("start do query",
		zap.Int64("msgID", msgID),
		zap.Bool("fromShardLeader", req.GetFromShardLeader()),
		zap.String("vChannel", req.GetDmlChannel()),
		zap.Int64s("segmentIDs", req.GetSegmentIDs()))
	tr := timerecord.NewTimeRecorder("")
622

623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638
	if req.FromShardLeader {
		// construct a queryTask
		queryTask := newQueryTask(ctx, req)
		queryTask.QS = qs
		queryTask.DataScope = querypb.DataScope_Historical
		err2 := node.scheduler.AddReadTask(ctx, queryTask)
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}

		err2 = queryTask.WaitToFinish()
		if err2 != nil {
			failRet.Status.Reason = err2.Error()
			return failRet, nil
		}
639 640 641 642

		tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
			msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

643 644 645 646 647 648 649 650
		failRet.Status.ErrorCode = commonpb.ErrorCode_Success
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(queryTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(queryTask.reduceDur.Milliseconds()))
		latency := tr.ElapseSpan()
		metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
		metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
651
		return queryTask.Ret, nil
652 653
	}

654 655 656 657 658 659 660 661 662 663 664 665 666
	cluster, ok := qs.clusterService.getShardCluster(req.GetDmlChannel())
	if !ok {
		failRet.Status.Reason = fmt.Sprintf("channel %s leader is not here", req.GetDmlChannel())
		return failRet, nil
	}

	// add cancel when error occurs
	queryCtx, cancel := context.WithCancel(ctx)
	defer cancel()

	var results []*internalpb.RetrieveResults
	var streamingResult *internalpb.RetrieveResults

667
	withStreaming := func(ctx context.Context) error {
668 669 670
		streamingTask := newQueryTask(queryCtx, req)
		streamingTask.DataScope = querypb.DataScope_Streaming
		streamingTask.QS = qs
671 672 673 674
		err := node.scheduler.AddReadTask(queryCtx, streamingTask)

		if err != nil {
			return err
675
		}
676 677 678
		err = streamingTask.WaitToFinish()
		if err != nil {
			return err
679
		}
680 681 682 683
		metrics.QueryNodeSQLatencyInQueue.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(streamingTask.queueDur.Milliseconds()))
		metrics.QueryNodeReduceLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()),
			metrics.QueryLabel).Observe(float64(streamingTask.reduceDur.Milliseconds()))
684
		streamingResult = streamingTask.Ret
685
		return nil
686 687
	}

688 689 690 691 692 693
	var errCluster error
	// shard leader dispatches request to its shard cluster
	results, errCluster = cluster.Query(queryCtx, req, withStreaming)
	if errCluster != nil {
		log.Warn("failed to query cluster", zap.Int64("msgID", msgID), zap.Int64("collectionID", req.Req.GetCollectionID()), zap.Error(errCluster))
		failRet.Status.Reason = errCluster.Error()
694 695
		return failRet, nil
	}
696 697 698 699

	tr.Elapse(fmt.Sprintf("start reduce query result, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

700 701 702 703 704 705
	results = append(results, streamingResult)
	ret, err2 := mergeInternalRetrieveResults(results)
	if err2 != nil {
		failRet.Status.Reason = err2.Error()
		return failRet, nil
	}
706 707 708 709

	tr.Elapse(fmt.Sprintf("do query done, msgID = %d, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
		msgID, req.GetFromShardLeader(), req.GetDmlChannel(), req.GetSegmentIDs()))

710 711 712 713
	failRet.Status.ErrorCode = commonpb.ErrorCode_Success
	latency := tr.ElapseSpan()
	metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel).Observe(float64(latency.Milliseconds()))
	metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID()), metrics.QueryLabel, metrics.SuccessLabel).Inc()
714
	return ret, nil
715 716
}

717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741
// SyncReplicaSegments syncs replica node & segments states
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
	if !node.isHealthy() {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
		}, nil
	}

	log.Debug("Received SyncReplicaSegments request", zap.String("vchannelName", req.GetVchannelName()))

	err := node.ShardClusterService.SyncReplicaSegments(req.GetVchannelName(), req.GetReplicaSegments())
	if err != nil {
		log.Warn("failed to sync replica semgents,", zap.String("vchannel", req.GetVchannelName()), zap.Error(err))
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}, nil
	}

	log.Debug("SyncReplicaSegments Done", zap.String("vchannel", req.GetVchannelName()))

	return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}

G
godchen 已提交
742
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
743
// TODO(dragondriver): cache the Metrics and set a retention to the cache
744 745 746
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if !node.isHealthy() {
		log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
747
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
748
			zap.String("req", req.Request),
X
Xiaofan 已提交
749
			zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID())))
750 751 752 753

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
Xiaofan 已提交
754
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.GetNodeID()),
755 756 757 758 759 760 761 762
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryNode.GetMetrics failed to parse metric type",
X
Xiaofan 已提交
763
			zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
764 765 766 767 768 769 770 771 772 773 774 775 776 777
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	if metricType == metricsinfo.SystemInfoMetrics {
		metrics, err := getSystemInfoMetrics(ctx, req, node)
X
Xiaofan 已提交
778 779
		if err != nil {
			log.Warn("QueryNode.GetMetrics failed",
X
Xiaofan 已提交
780
				zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
X
Xiaofan 已提交
781 782 783 784
				zap.String("req", req.Request),
				zap.String("metric_type", metricType),
				zap.Error(err))
		}
785

G
godchen 已提交
786
		return metrics, nil
787 788 789
	}

	log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
X
Xiaofan 已提交
790
		zap.Int64("node_id", Params.QueryNodeCfg.GetNodeID()),
791 792 793 794 795 796 797 798 799 800 801
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}