impl.go 22.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13 14 15 16

import (
	"context"
	"errors"
17

18 19
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
20 21 22 23 24
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
xige-16 已提交
25
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
26 27
)

28
// GetComponentStates return information about whether the coord is healthy
29
func (qc *QueryCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
30
	serviceComponentInfo := &internalpb.ComponentInfo{
31 32
		NodeID:    Params.QueryCoordID,
		StateCode: qc.stateCode.Load().(internalpb.StateCode),
33
	}
34 35 36 37 38 39 40 41 42 43

	//subComponentInfos, err := qs.cluster.GetComponentInfos(ctx)
	//if err != nil {
	//	return &internalpb.ComponentStates{
	//		Status: &commonpb.Status{
	//			ErrorCode: commonpb.ErrorCode_UnexpectedError,
	//			Reason:    err.Error(),
	//		},
	//	}, err
	//}
44 45 46 47
	return &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
48 49
		State: serviceComponentInfo,
		//SubcomponentStates: subComponentInfos,
50 51 52
	}, nil
}

53 54
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which has been sent by query nodes
55
func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
56 57 58 59 60 61 62 63 64
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.TimeTickChannelName,
	}, nil
}

65
func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
66 67 68 69 70 71 72 73 74
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
}

75
// ShowCollections return all the collections that have been loaded
76
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
77
	dbID := req.DbID
78
	log.Debug("show collection start", zap.Int64("dbID", dbID))
79 80 81 82 83 84 85 86 87 88 89 90
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show collection end with query coordinator not healthy")
		return &querypb.ShowCollectionsResponse{
			Status: status,
		}, err
	}
91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
	collectionInfos := qc.meta.showCollections()
	ID2collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
	inMemoryCollectionIDs := make([]UniqueID, 0)
	for _, info := range collectionInfos {
		ID2collectionInfo[info.CollectionID] = info
		inMemoryCollectionIDs = append(inMemoryCollectionIDs, info.CollectionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.CollectionIDs) == 0 {
		for _, id := range inMemoryCollectionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
		}
		log.Debug("show collection end", zap.Int64s("collections", inMemoryCollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowCollectionsResponse{
			Status:              status,
			CollectionIDs:       inMemoryCollectionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.CollectionIDs {
		if _, ok := ID2collectionInfo[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
113
			err := errors.New("collection has not been loaded to memory or load failed")
114 115 116 117 118 119 120 121
			status.Reason = err.Error()
			return &querypb.ShowCollectionsResponse{
				Status: status,
			}, err
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
	}
	log.Debug("show collection end", zap.Int64s("collections", req.CollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
122
	return &querypb.ShowCollectionsResponse{
123 124 125
		Status:              status,
		CollectionIDs:       req.CollectionIDs,
		InMemoryPercentages: inMemoryPercentages,
126 127 128
	}, nil
}

129
// LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes
130
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
131
	collectionID := req.CollectionID
132
	//schema := req.Schema
133 134 135
	log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
		zap.Stringer("schema", req.Schema))
	status := &commonpb.Status{
136
		ErrorCode: commonpb.ErrorCode_Success,
137
	}
138 139 140 141 142 143 144
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load collection end with query coordinator not healthy")
		return status, err
	}
145

146
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
147
	loadCollectionTask := &LoadCollectionTask{
148
		BaseTask:              baseTask,
149
		LoadCollectionRequest: req,
150 151 152 153
		rootCoord:             qc.rootCoordClient,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
154
	}
155 156 157 158 159 160
	err := qc.scheduler.Enqueue(loadCollectionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}
161

162
	err = loadCollectionTask.waitToFinish()
163
	if err != nil {
164
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
165 166 167
		status.Reason = err.Error()
		return status, err
	}
168 169 170 171 172

	log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	return status, nil
}

173
// ReleaseCollection clears all data related to this collecion on the querynode
174
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
175
	//dbID := req.DbID
176 177 178 179 180
	collectionID := req.CollectionID
	log.Debug("ReleaseCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
181 182 183 184 185 186 187 188
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release collection end with query coordinator not healthy")
		return status, err
	}

189
	hasCollection := qc.meta.hasCollection(collectionID)
190
	if !hasCollection {
X
xige-16 已提交
191
		log.Warn("release collection end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
192 193 194
		return status, nil
	}

195
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
196
	releaseCollectionTask := &ReleaseCollectionTask{
197
		BaseTask:                 baseTask,
198
		ReleaseCollectionRequest: req,
199
		cluster:                  qc.cluster,
200 201
		meta:                     qc.meta,
		rootCoord:                qc.rootCoordClient,
202
	}
203 204 205 206 207 208
	err := qc.scheduler.Enqueue(releaseCollectionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}
209

210
	err = releaseCollectionTask.waitToFinish()
211 212 213 214 215 216 217
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}

	log.Debug("ReleaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
218
	//qc.MetaReplica.printMeta()
219
	//qc.cluster.printMeta()
220 221 222
	return status, nil
}

223
// ShowPartitions return all the partitions that have been loaded
224
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
225
	collectionID := req.CollectionID
226
	log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs))
227 228 229 230 231 232 233 234 235 236 237 238 239
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show partition end with query coordinator not healthy")
		return &querypb.ShowPartitionsResponse{
			Status: status,
		}, err
	}

240
	partitionStates, err := qc.meta.showPartitions(collectionID)
241
	if err != nil {
242 243
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
244
		return &querypb.ShowPartitionsResponse{
245
			Status: status,
246 247
		}, err
	}
248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
	ID2PartitionState := make(map[UniqueID]*querypb.PartitionStates)
	inMemoryPartitionIDs := make([]UniqueID, 0)
	for _, state := range partitionStates {
		ID2PartitionState[state.PartitionID] = state
		inMemoryPartitionIDs = append(inMemoryPartitionIDs, state.PartitionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.PartitionIDs) == 0 {
		for _, id := range inMemoryPartitionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
		}
		log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", inMemoryPartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowPartitionsResponse{
			Status:              status,
			PartitionIDs:        inMemoryPartitionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.PartitionIDs {
		if _, ok := ID2PartitionState[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
269
			err := errors.New("partition has not been loaded to memory or load failed")
270 271 272 273 274 275 276
			status.Reason = err.Error()
			return &querypb.ShowPartitionsResponse{
				Status: status,
			}, err
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
	}
277

278
	log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
279

280
	return &querypb.ShowPartitionsResponse{
281 282 283
		Status:              status,
		PartitionIDs:        req.PartitionIDs,
		InMemoryPercentages: inMemoryPercentages,
284 285 286
	}, nil
}

287
// LoadPartitions loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes
288
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
289 290 291 292
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("LoadPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
293
		ErrorCode: commonpb.ErrorCode_Success,
294
	}
295 296 297 298 299 300 301
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load partition end with query coordinator not healthy")
		return status, err
	}
302 303

	if len(partitionIDs) == 0 {
304
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
305 306
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
307
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
308
		return status, err
309 310
	}

311 312 313
	hasCollection := qc.meta.hasCollection(collectionID)
	if hasCollection {
		partitionIDsToLoad := make([]UniqueID, 0)
314 315
		loadType, _ := qc.meta.getLoadType(collectionID)
		if loadType == querypb.LoadType_loadCollection {
316 317 318 319 320 321 322 323 324 325 326 327 328
			for _, partitionID := range partitionIDs {
				hasReleasePartition := qc.meta.hasReleasePartition(collectionID, partitionID)
				if hasReleasePartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
		} else {
			for _, partitionID := range partitionIDs {
				hasPartition := qc.meta.hasPartition(collectionID, partitionID)
				if !hasPartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
329 330
		}

331 332 333
		if len(partitionIDsToLoad) == 0 {
			log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
			return status, nil
334
		}
335 336 337
		req.PartitionIDs = partitionIDsToLoad
	}

338
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
339
	loadPartitionTask := &LoadPartitionTask{
340
		BaseTask:              baseTask,
341 342 343 344 345
		LoadPartitionsRequest: req,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
	}
346 347 348 349 350 351
	err := qc.scheduler.Enqueue(loadPartitionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}
352

353
	err = loadPartitionTask.waitToFinish()
354 355 356 357 358
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
359 360 361
	}

	log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
362
	return status, nil
363 364
}

365
// ReleasePartitions clears all data related to this partition on the querynode
366
func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
367
	//dbID := req.DbID
368 369 370 371 372 373
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("ReleasePartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
374 375 376 377 378 379 380 381
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release partition end with query coordinator not healthy")
		return status, err
	}

382 383
	hasCollection := qc.meta.hasCollection(collectionID)
	if !hasCollection {
X
xige-16 已提交
384
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
385
		return status, nil
386 387
	}

388 389 390 391 392 393 394
	if len(partitionIDs) == 0 {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
		log.Debug("releasePartitionsRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
	}
395

X
xige-16 已提交
396 397 398 399 400 401 402 403 404 405 406 407 408
	toReleasedPartitions := make([]UniqueID, 0)
	for _, id := range partitionIDs {
		hasPartition := qc.meta.hasPartition(collectionID, id)
		if hasPartition {
			toReleasedPartitions = append(toReleasedPartitions, id)
		}
	}
	if len(toReleasedPartitions) == 0 {
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64s("partitionIDs", partitionIDs))
		return status, nil
	}

	req.PartitionIDs = toReleasedPartitions
409
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
410
	releasePartitionTask := &ReleasePartitionTask{
411
		BaseTask:                 baseTask,
412 413 414
		ReleasePartitionsRequest: req,
		cluster:                  qc.cluster,
	}
415 416 417 418 419 420
	err := qc.scheduler.Enqueue(releasePartitionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}
421

422
	err = releasePartitionTask.waitToFinish()
423 424 425 426
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
427 428
	}
	log.Debug("ReleasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
429
	//qc.MetaReplica.printMeta()
430
	//qc.cluster.printMeta()
431 432 433
	return status, nil
}

434
// CreateQueryChannel assigns unique querychannel and resultchannel to the specified collecion
435
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
436 437 438 439 440 441 442 443 444 445 446 447 448
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with query coordinator not healthy")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
		}, err
	}

449
	collectionID := req.CollectionID
450
	queryChannel, queryResultChannel, err := qc.meta.getQueryChannel(collectionID)
X
xige-16 已提交
451 452 453 454 455 456 457 458
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with error")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
		}, err
	}
459 460

	return &querypb.CreateQueryChannelResponse{
461
		Status:         status,
462 463 464 465 466
		RequestChannel: queryChannel,
		ResultChannel:  queryResultChannel,
	}, nil
}

467
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
468
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
469 470 471 472 473 474 475 476 477 478 479 480 481
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getPartitionStates end with query coordinator not healthy")
		return &querypb.GetPartitionStatesResponse{
			Status: status,
		}, err
	}

482 483 484
	partitionIDs := req.PartitionIDs
	partitionStates := make([]*querypb.PartitionStates, 0)
	for _, partitionID := range partitionIDs {
485
		res, err := qc.meta.getPartitionStatesByID(req.CollectionID, partitionID)
486
		if err != nil {
487 488
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
489
			return &querypb.GetPartitionStatesResponse{
490
				Status: status,
491 492
			}, err
		}
493 494
		partitionState := &querypb.PartitionStates{
			PartitionID: partitionID,
495
			State:       res.State,
496 497
		}
		partitionStates = append(partitionStates, partitionState)
498 499
	}

500
	return &querypb.GetPartitionStatesResponse{
501
		Status:                status,
502
		PartitionDescriptions: partitionStates,
503 504 505
	}, nil
}

506
// GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ...
507
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
508 509 510 511 512 513 514 515 516 517 518 519 520
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getSegmentInfo end with query coordinator not healthy")
		return &querypb.GetSegmentInfoResponse{
			Status: status,
		}, err
	}

521 522
	totalMemSize := int64(0)
	totalNumRows := int64(0)
523
	//TODO::get segment infos from MetaReplica
524
	//segmentIDs := req.SegmentIDs
525
	//segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs)
526
	segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
527
	if err != nil {
528 529
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
530
		return &querypb.GetSegmentInfoResponse{
531
			Status: status,
532 533
		}, err
	}
534 535 536
	for _, info := range segmentInfos {
		totalNumRows += info.NumRows
		totalMemSize += info.MemSize
537
	}
538
	log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
539
	return &querypb.GetSegmentInfoResponse{
540 541
		Status: status,
		Infos:  segmentInfos,
542 543
	}, nil
}
544 545 546 547 548 549

func (qc *QueryCoord) isHealthy() bool {
	code := qc.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

550
// GetMetrics returns all the queryCoord's metrics
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("QueryCoord.GetMetrics",
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request))

	if !qc.isHealthy() {
		log.Warn("QueryCoord.GetMetrics failed",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(errQueryCoordIsUnhealthy(Params.QueryCoordID)))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgQueryCoordIsUnhealthy(Params.QueryCoordID),
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryCoord.GetMetrics failed to parse metric type",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
X
xige-16 已提交
584
		}, err
585 586 587 588 589 590
	}

	log.Debug("QueryCoord.GetMetrics",
		zap.String("metric_type", metricType))

	if metricType == metricsinfo.SystemInfoMetrics {
591 592 593 594 595 596 597
		ret, err := qc.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

598 599 600 601 602 603 604 605 606
		metrics, err := getSystemInfoMetrics(ctx, req, qc)

		log.Debug("QueryCoord.GetMetrics",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

607 608
		qc.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

609 610
		return metrics, err
	}
X
xige-16 已提交
611 612
	err = errors.New(metricsinfo.MsgUnimplementedMetric)
	log.Debug("QueryCoord.GetMetrics failed",
613 614
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request),
X
xige-16 已提交
615 616
		zap.String("metric_type", metricType),
		zap.Error(err))
617 618 619 620

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
xige-16 已提交
621
			Reason:    err.Error(),
622 623
		},
		Response: "",
X
xige-16 已提交
624
	}, err
625
}