impl.go 19.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17 18 19 20
package querynode

import (
	"context"
21
	"fmt"
22 23 24

	"go.uber.org/zap"

25
	"github.com/milvus-io/milvus/internal/common"
X
Xiangyu Wang 已提交
26 27 28 29 30
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	queryPb "github.com/milvus-io/milvus/internal/proto/querypb"
31
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
33 34
)

35
// GetComponentStates returns information about whether the node is healthy
36
func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
37
	log.Debug("Get QueryNode component states")
38 39 40 41 42
	stats := &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
	}
43 44 45
	code, ok := node.stateCode.Load().(internalpb.StateCode)
	if !ok {
		errMsg := "unexpected error in type assertion"
46 47
		stats.Status = &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
48
			Reason:    errMsg,
49
		}
G
godchen 已提交
50
		return stats, nil
51 52 53 54
	}
	nodeID := common.NotRegisteredID
	if node.session != nil && node.session.Registered() {
		nodeID = node.session.ServerID
55 56
	}
	info := &internalpb.ComponentInfo{
57
		NodeID:    nodeID,
58 59 60 61
		Role:      typeutil.QueryNodeRole,
		StateCode: code,
	}
	stats.State = info
62
	log.Debug("Get QueryNode component state done", zap.Any("stateCode", info.StateCode))
63 64 65
	return stats, nil
}

66 67
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which will be sent by query nodes
68 69 70 71 72 73
func (node *QueryNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
74
		Value: Params.CommonCfg.QueryCoordTimeTick,
75 76 77
	}, nil
}

78
// GetStatisticsChannel returns the statistics channel
79
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
80 81 82 83 84 85
func (node *QueryNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
86
		Value: Params.CommonCfg.QueryNodeStats,
87 88 89
	}, nil
}

90
// AddQueryChannel watch queryChannel of the collection to receive query message
91
func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQueryChannelRequest) (*commonpb.Status, error) {
92 93
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
94
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
95 96 97 98
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
99
		return status, nil
100
	}
101 102 103 104 105 106 107
	dct := &addQueryChannelTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
108
	}
109

110
	err := node.scheduler.queue.Enqueue(dct)
111 112 113 114
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
115
		}
116
		log.Error(err.Error())
G
godchen 已提交
117
		return status, nil
118
	}
119 120 121 122 123
	log.Debug("addQueryChannelTask Enqueue done",
		zap.Int64("collectionID", in.CollectionID),
		zap.String("queryChannel", in.QueryChannel),
		zap.String("queryResultChannel", in.QueryResultChannel),
	)
124

125 126 127 128 129 130
	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
131
			}
132
			log.Error(err.Error())
G
godchen 已提交
133
			return status, nil
134
		}
135 136 137 138 139 140
		log.Debug("addQueryChannelTask WaitToFinish done",
			zap.Int64("collectionID", in.CollectionID),
			zap.String("queryChannel", in.QueryChannel),
			zap.String("queryResultChannel", in.QueryResultChannel),
		)

141 142 143
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
144
	}
145

146
	return waitFunc()
147 148
}

149
// RemoveQueryChannel remove queryChannel of the collection to stop receiving query message
150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
func (node *QueryNode) RemoveQueryChannel(ctx context.Context, in *queryPb.RemoveQueryChannelRequest) (*commonpb.Status, error) {
	// if node.searchService == nil || node.searchService.searchMsgStream == nil {
	// 	errMsg := "null search service or null search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// searchStream, ok := node.searchService.searchMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// resultStream, ok := node.searchService.searchResultMsgStream.(*pulsarms.PulsarMsgStream)
	// if !ok {
	// 	errMsg := "type assertion failed for search result message stream"
	// 	status := &commonpb.Status{
	// 		ErrorCode: commonpb.ErrorCode_UnexpectedError,
	// 		Reason:    errMsg,
	// 	}

	// 	return status, errors.New(errMsg)
	// }

	// // remove request channel
	// consumeChannels := []string{in.RequestChannelID}
	// consumeSubName := Params.MsgChannelSubName
	// // TODO: searchStream.RemovePulsarConsumers(producerChannels)
	// searchStream.AsConsumer(consumeChannels, consumeSubName)

	// // remove result channel
	// producerChannels := []string{in.ResultChannelID}
	// // TODO: resultStream.RemovePulsarProducer(producerChannels)
	// resultStream.AsProducer(producerChannels)

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

G
godchen 已提交
200
// WatchDmChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
201
func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmChannelsRequest) (*commonpb.Status, error) {
202 203
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
204
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
205 206 207 208
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
209
		return status, nil
210
	}
211 212 213 214 215 216 217
	dct := &watchDmChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
218 219
	}

220 221
	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
222 223
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
224
			Reason:    err.Error(),
225
		}
226
		log.Error(err.Error())
G
godchen 已提交
227
		return status, nil
228
	}
229
	log.Debug("watchDmChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
230

231
	waitFunc := func() (*commonpb.Status, error) {
232
		err = dct.WaitToFinish()
233
		if err != nil {
234 235 236 237
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
238
			log.Error(err.Error())
G
godchen 已提交
239
			return status, nil
240
		}
241
		log.Debug("watchDmChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
242 243 244
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
245
	}
246 247

	return waitFunc()
248 249
}

G
godchen 已提交
250
// WatchDeltaChannels create consumers on dmChannels to receive Incremental data,which is the important part of real-time query
251
func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.WatchDeltaChannelsRequest) (*commonpb.Status, error) {
252 253
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
254
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
255 256 257 258
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
259
		return status, nil
260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275
	}
	dct := &watchDeltaChannelsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
276
		log.Error(err.Error())
G
godchen 已提交
277
		return status, nil
278
	}
279
	log.Debug("watchDeltaChannelsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
280 281 282 283 284 285 286 287

	waitFunc := func() (*commonpb.Status, error) {
		err = dct.WaitToFinish()
		if err != nil {
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
288
			log.Error(err.Error())
G
godchen 已提交
289
			return status, nil
290
		}
291
		log.Debug("watchDeltaChannelsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
292 293 294 295 296 297
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
	}

	return waitFunc()
298 299
}

300
// LoadSegments load historical data into query node, historical data can be vector data or index
301
func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegmentsRequest) (*commonpb.Status, error) {
302 303
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
304
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
305 306 307 308
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
309
		return status, nil
310
	}
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325
	dct := &loadSegmentsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
326
		log.Error(err.Error())
G
godchen 已提交
327
		return status, nil
328
	}
329 330 331 332
	segmentIDs := make([]UniqueID, 0)
	for _, info := range in.Infos {
		segmentIDs = append(segmentIDs, info.SegmentID)
	}
X
xige-16 已提交
333
	log.Debug("loadSegmentsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
334

335
	waitFunc := func() (*commonpb.Status, error) {
336 337
		err = dct.WaitToFinish()
		if err != nil {
338 339 340 341
			status := &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			}
342
			log.Error(err.Error())
G
godchen 已提交
343
			return status, nil
344
		}
X
xige-16 已提交
345
		log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", segmentIDs), zap.Int64("nodeID", Params.QueryNodeCfg.QueryNodeID))
346 347 348
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		}, nil
349
	}
350 351

	return waitFunc()
352 353
}

G
godchen 已提交
354
// ReleaseCollection clears all data related to this collection on the querynode
355
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.ReleaseCollectionRequest) (*commonpb.Status, error) {
356 357
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
358
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
359 360 361 362
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
363
		return status, nil
364
	}
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379
	dct := &releaseCollectionTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
380
		log.Error(err.Error())
G
godchen 已提交
381
		return status, nil
382
	}
X
xige-16 已提交
383
	log.Debug("releaseCollectionTask Enqueue done", zap.Int64("collectionID", in.CollectionID))
384

385
	func() {
386 387
		err = dct.WaitToFinish()
		if err != nil {
388
			log.Error(err.Error())
389
			return
390
		}
X
xige-16 已提交
391
		log.Debug("releaseCollectionTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID))
392
	}()
393 394 395 396 397 398 399

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

400
// ReleasePartitions clears all data related to this partition on the querynode
401
func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.ReleasePartitionsRequest) (*commonpb.Status, error) {
402 403
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
404
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
405 406 407 408
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
409
		return status, nil
410
	}
411 412 413 414 415 416 417 418 419 420 421 422 423 424 425
	dct := &releasePartitionsTask{
		baseTask: baseTask{
			ctx:  ctx,
			done: make(chan error),
		},
		req:  in,
		node: node,
	}

	err := node.scheduler.queue.Enqueue(dct)
	if err != nil {
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
426
		log.Error(err.Error())
G
godchen 已提交
427
		return status, nil
428
	}
X
xige-16 已提交
429
	log.Debug("releasePartitionsTask Enqueue done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
430

431
	func() {
432 433
		err = dct.WaitToFinish()
		if err != nil {
434
			log.Error(err.Error())
435
			return
436
		}
X
xige-16 已提交
437
		log.Debug("releasePartitionsTask WaitToFinish done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("partitionIDs", in.PartitionIDs))
438
	}()
439 440 441 442 443 444 445

	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	return status, nil
}

446
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
447
func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseSegmentsRequest) (*commonpb.Status, error) {
448 449
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
450
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
451 452 453 454
		status := &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    err.Error(),
		}
G
godchen 已提交
455
		return status, nil
456
	}
457 458 459 460
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	for _, id := range in.SegmentIDs {
461 462 463 464 465 466 467 468
		err := node.historical.replica.removeSegment(id)
		if err != nil {
			// not return, try to release all segments
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
		}
		err = node.streaming.replica.removeSegment(id)
		if err != nil {
469 470
			// not return, try to release all segments
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
471
			status.Reason = err.Error()
472 473
		}
	}
X
xige-16 已提交
474 475

	log.Debug("release segments done", zap.Int64("collectionID", in.CollectionID), zap.Int64s("segmentIDs", in.SegmentIDs))
476 477 478
	return status, nil
}

479
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
480
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmentInfoRequest) (*queryPb.GetSegmentInfoResponse, error) {
481 482
	code := node.stateCode.Load().(internalpb.StateCode)
	if code != internalpb.StateCode_Healthy {
483
		err := fmt.Errorf("query node %d is not ready", Params.QueryNodeCfg.QueryNodeID)
484 485 486 487 488 489
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
490
		return res, nil
491
	}
492 493 494 495 496 497
	var segmentInfos []*queryPb.SegmentInfo

	segmentIDs := make(map[int64]struct{})
	for _, segmentID := range in.GetSegmentIDs() {
		segmentIDs[segmentID] = struct{}{}
	}
498

499
	// get info from historical
500
	historicalSegmentInfos, err := node.historical.replica.getSegmentInfosByColID(in.CollectionID)
501
	if err != nil {
502
		log.Debug("GetSegmentInfo: get historical segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
503 504 505 506 507 508
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
509
		return res, nil
510
	}
511
	segmentInfos = append(segmentInfos, filterSegmentInfo(historicalSegmentInfos, segmentIDs)...)
512

513
	// get info from streaming
514
	streamingSegmentInfos, err := node.streaming.replica.getSegmentInfosByColID(in.CollectionID)
515
	if err != nil {
516
		log.Debug("GetSegmentInfo: get streaming segmentInfo failed", zap.Int64("collectionID", in.CollectionID), zap.Error(err))
517 518 519 520 521 522
		res := &queryPb.GetSegmentInfoResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
		}
G
godchen 已提交
523
		return res, nil
524
	}
525
	segmentInfos = append(segmentInfos, filterSegmentInfo(streamingSegmentInfos, segmentIDs)...)
526

527 528 529 530
	return &queryPb.GetSegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
531
		Infos: segmentInfos,
532 533
	}, nil
}
534

535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550
// filterSegmentInfo returns segment info which segment id in segmentIDs map
func filterSegmentInfo(segmentInfos []*queryPb.SegmentInfo, segmentIDs map[int64]struct{}) []*queryPb.SegmentInfo {
	if len(segmentIDs) == 0 {
		return segmentInfos
	}
	filtered := make([]*queryPb.SegmentInfo, 0, len(segmentIDs))
	for _, info := range segmentInfos {
		_, ok := segmentIDs[info.GetSegmentID()]
		if !ok {
			continue
		}
		filtered = append(filtered, info)
	}
	return filtered
}

551
// isHealthy checks if QueryNode is healthy
552 553 554 555 556
func (node *QueryNode) isHealthy() bool {
	code := node.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

G
godchen 已提交
557
// GetMetrics return system infos of the query node, such as total memory, memory usage, cpu usage ...
558
// TODO(dragondriver): cache the Metrics and set a retention to the cache
559 560 561
func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	if !node.isHealthy() {
		log.Warn("QueryNode.GetMetrics failed",
562
			zap.Int64("node_id", Params.QueryNodeCfg.QueryNodeID),
563
			zap.String("req", req.Request),
564
			zap.Error(errQueryNodeIsUnhealthy(Params.QueryNodeCfg.QueryNodeID)))
565 566 567 568

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
569
				Reason:    msgQueryNodeIsUnhealthy(Params.QueryNodeCfg.QueryNodeID),
570 571 572 573 574 575 576 577
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryNode.GetMetrics failed to parse metric type",
578
			zap.Int64("node_id", Params.QueryNodeCfg.QueryNodeID),
579 580 581 582 583 584 585 586 587 588 589 590 591 592
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
		}, nil
	}

	if metricType == metricsinfo.SystemInfoMetrics {
		metrics, err := getSystemInfoMetrics(ctx, req, node)
X
Xiaofan 已提交
593 594 595 596 597 598 599
		if err != nil {
			log.Warn("QueryNode.GetMetrics failed",
				zap.Int64("node_id", Params.QueryNodeCfg.QueryNodeID),
				zap.String("req", req.Request),
				zap.String("metric_type", metricType),
				zap.Error(err))
		}
600

G
godchen 已提交
601
		return metrics, nil
602 603 604
	}

	log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
605
		zap.Int64("node_id", Params.QueryNodeCfg.QueryNodeID),
606 607 608 609 610 611 612 613 614 615 616
		zap.String("req", req.Request),
		zap.String("metric_type", metricType))

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
			Reason:    metricsinfo.MsgUnimplementedMetric,
		},
		Response: "",
	}, nil
}