impl.go 22.4 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.

12
package querycoord
13 14 15 16

import (
	"context"
	"errors"
17

18 19
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
20 21 22 23 24
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
xige-16 已提交
25
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
26 27
)

28
// GetComponentStates return information about whether the coord is healthy
29
func (qc *QueryCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
30
	serviceComponentInfo := &internalpb.ComponentInfo{
31 32
		NodeID:    Params.QueryCoordID,
		StateCode: qc.stateCode.Load().(internalpb.StateCode),
33
	}
34 35 36 37 38 39 40 41 42 43

	//subComponentInfos, err := qs.cluster.GetComponentInfos(ctx)
	//if err != nil {
	//	return &internalpb.ComponentStates{
	//		Status: &commonpb.Status{
	//			ErrorCode: commonpb.ErrorCode_UnexpectedError,
	//			Reason:    err.Error(),
	//		},
	//	}, err
	//}
44 45 46 47
	return &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
48 49
		State: serviceComponentInfo,
		//SubcomponentStates: subComponentInfos,
50 51 52
	}, nil
}

53
func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
54 55 56 57 58 59 60 61 62
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.TimeTickChannelName,
	}, nil
}

63
func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
64 65 66 67 68 69 70 71 72
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
}

73
// ShowCollections return all the collections that have been loaded
74
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
75
	dbID := req.DbID
76
	log.Debug("show collection start", zap.Int64("dbID", dbID))
77 78 79 80 81 82 83 84 85 86 87 88
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show collection end with query coordinator not healthy")
		return &querypb.ShowCollectionsResponse{
			Status: status,
		}, err
	}
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
	collectionInfos := qc.meta.showCollections()
	ID2collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
	inMemoryCollectionIDs := make([]UniqueID, 0)
	for _, info := range collectionInfos {
		ID2collectionInfo[info.CollectionID] = info
		inMemoryCollectionIDs = append(inMemoryCollectionIDs, info.CollectionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.CollectionIDs) == 0 {
		for _, id := range inMemoryCollectionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
		}
		log.Debug("show collection end", zap.Int64s("collections", inMemoryCollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowCollectionsResponse{
			Status:              status,
			CollectionIDs:       inMemoryCollectionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.CollectionIDs {
		if _, ok := ID2collectionInfo[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
111
			err := errors.New("collection has not been loaded to memory or load failed")
112 113 114 115 116 117 118 119
			status.Reason = err.Error()
			return &querypb.ShowCollectionsResponse{
				Status: status,
			}, err
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
	}
	log.Debug("show collection end", zap.Int64s("collections", req.CollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
120
	return &querypb.ShowCollectionsResponse{
121 122 123
		Status:              status,
		CollectionIDs:       req.CollectionIDs,
		InMemoryPercentages: inMemoryPercentages,
124 125 126
	}, nil
}

127
// LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes
128
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
129
	collectionID := req.CollectionID
130
	//schema := req.Schema
131 132 133
	log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
		zap.Stringer("schema", req.Schema))
	status := &commonpb.Status{
134
		ErrorCode: commonpb.ErrorCode_Success,
135
	}
136 137 138 139 140 141 142
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load collection end with query coordinator not healthy")
		return status, err
	}
143 144 145

	loadCollectionTask := &LoadCollectionTask{
		BaseTask: BaseTask{
146 147
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
148
			triggerCondition: querypb.TriggerCondition_grpcRequest,
149 150
		},
		LoadCollectionRequest: req,
151 152 153 154
		rootCoord:             qc.rootCoordClient,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
155
	}
156
	qc.scheduler.Enqueue([]task{loadCollectionTask})
157

158 159
	err := loadCollectionTask.WaitToFinish()
	if err != nil {
160
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
161 162 163
		status.Reason = err.Error()
		return status, err
	}
164 165 166 167 168

	log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	return status, nil
}

169
// ReleaseCollection clears all data related to this collecion on the querynode
170
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
171
	//dbID := req.DbID
172 173 174 175 176
	collectionID := req.CollectionID
	log.Debug("ReleaseCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
177 178 179 180 181 182 183 184
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release collection end with query coordinator not healthy")
		return status, err
	}

185
	hasCollection := qc.meta.hasCollection(collectionID)
186
	if !hasCollection {
X
xige-16 已提交
187
		log.Warn("release collection end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
188 189 190 191 192
		return status, nil
	}

	releaseCollectionTask := &ReleaseCollectionTask{
		BaseTask: BaseTask{
193
			ctx:              qc.loopCtx,
194
			Condition:        NewTaskCondition(qc.loopCtx),
195
			triggerCondition: querypb.TriggerCondition_grpcRequest,
196 197
		},
		ReleaseCollectionRequest: req,
198
		cluster:                  qc.cluster,
199 200
		meta:                     qc.meta,
		rootCoord:                qc.rootCoordClient,
201
	}
202
	qc.scheduler.Enqueue([]task{releaseCollectionTask})
203 204

	err := releaseCollectionTask.WaitToFinish()
205 206 207 208 209 210 211
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
	}

	log.Debug("ReleaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
212
	//qc.MetaReplica.printMeta()
213
	//qc.cluster.printMeta()
214 215 216
	return status, nil
}

217
// ShowPartitions return all the partitions that have been loaded
218
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
219
	collectionID := req.CollectionID
220
	log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs))
221 222 223 224 225 226 227 228 229 230 231 232 233
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show partition end with query coordinator not healthy")
		return &querypb.ShowPartitionsResponse{
			Status: status,
		}, err
	}

234
	partitionStates, err := qc.meta.showPartitions(collectionID)
235
	if err != nil {
236 237
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
238
		return &querypb.ShowPartitionsResponse{
239
			Status: status,
240 241
		}, err
	}
242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262
	ID2PartitionState := make(map[UniqueID]*querypb.PartitionStates)
	inMemoryPartitionIDs := make([]UniqueID, 0)
	for _, state := range partitionStates {
		ID2PartitionState[state.PartitionID] = state
		inMemoryPartitionIDs = append(inMemoryPartitionIDs, state.PartitionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.PartitionIDs) == 0 {
		for _, id := range inMemoryPartitionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
		}
		log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", inMemoryPartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowPartitionsResponse{
			Status:              status,
			PartitionIDs:        inMemoryPartitionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.PartitionIDs {
		if _, ok := ID2PartitionState[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
263
			err := errors.New("partition has not been loaded to memory or load failed")
264 265 266 267 268 269 270
			status.Reason = err.Error()
			return &querypb.ShowPartitionsResponse{
				Status: status,
			}, err
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
	}
271

272
	log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
273

274
	return &querypb.ShowPartitionsResponse{
275 276 277
		Status:              status,
		PartitionIDs:        req.PartitionIDs,
		InMemoryPercentages: inMemoryPercentages,
278 279 280
	}, nil
}

281
// LoadPartition loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes
282
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
283 284 285 286
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("LoadPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
287
		ErrorCode: commonpb.ErrorCode_Success,
288
	}
289 290 291 292 293 294 295
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load partition end with query coordinator not healthy")
		return status, err
	}
296 297

	if len(partitionIDs) == 0 {
298
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
299 300
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
301
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
302
		return status, err
303 304
	}

305 306 307
	hasCollection := qc.meta.hasCollection(collectionID)
	if hasCollection {
		partitionIDsToLoad := make([]UniqueID, 0)
308 309
		loadType, _ := qc.meta.getLoadType(collectionID)
		if loadType == querypb.LoadType_loadCollection {
310 311 312 313 314 315 316 317 318 319 320 321 322
			for _, partitionID := range partitionIDs {
				hasReleasePartition := qc.meta.hasReleasePartition(collectionID, partitionID)
				if hasReleasePartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
		} else {
			for _, partitionID := range partitionIDs {
				hasPartition := qc.meta.hasPartition(collectionID, partitionID)
				if !hasPartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
323 324
		}

325 326 327
		if len(partitionIDsToLoad) == 0 {
			log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
			return status, nil
328
		}
329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350
		req.PartitionIDs = partitionIDsToLoad
	}

	loadPartitionTask := &LoadPartitionTask{
		BaseTask: BaseTask{
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
			triggerCondition: querypb.TriggerCondition_grpcRequest,
		},
		LoadPartitionsRequest: req,
		dataCoord:             qc.dataCoordClient,
		cluster:               qc.cluster,
		meta:                  qc.meta,
	}
	qc.scheduler.Enqueue([]task{loadPartitionTask})

	err := loadPartitionTask.WaitToFinish()
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
351 352 353
	}

	log.Debug("LoadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
354
	return status, nil
355 356
}

357
// ReleasePartition clears all data related to this partition on the querynode
358
func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
359
	//dbID := req.DbID
360 361 362 363 364 365
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
	log.Debug("ReleasePartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
366 367 368 369 370 371 372 373
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release partition end with query coordinator not healthy")
		return status, err
	}

374 375
	hasCollection := qc.meta.hasCollection(collectionID)
	if !hasCollection {
X
xige-16 已提交
376
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
377
		return status, nil
378 379
	}

380 381 382 383 384 385 386
	if len(partitionIDs) == 0 {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
		log.Debug("releasePartitionsRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID))
		return status, err
	}
387

X
xige-16 已提交
388 389 390 391 392 393 394 395 396 397 398 399 400
	toReleasedPartitions := make([]UniqueID, 0)
	for _, id := range partitionIDs {
		hasPartition := qc.meta.hasPartition(collectionID, id)
		if hasPartition {
			toReleasedPartitions = append(toReleasedPartitions, id)
		}
	}
	if len(toReleasedPartitions) == 0 {
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64s("partitionIDs", partitionIDs))
		return status, nil
	}

	req.PartitionIDs = toReleasedPartitions
401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416
	releasePartitionTask := &ReleasePartitionTask{
		BaseTask: BaseTask{
			ctx:              qc.loopCtx,
			Condition:        NewTaskCondition(qc.loopCtx),
			triggerCondition: querypb.TriggerCondition_grpcRequest,
		},
		ReleasePartitionsRequest: req,
		cluster:                  qc.cluster,
	}
	qc.scheduler.Enqueue([]task{releasePartitionTask})

	err := releasePartitionTask.WaitToFinish()
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, err
417 418
	}
	log.Debug("ReleasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
419
	//qc.MetaReplica.printMeta()
420
	//qc.cluster.printMeta()
421 422 423
	return status, nil
}

424
// CreateQueryChannel assigns unique querychannel and resultchannel to the specified collecion
425
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
426 427 428 429 430 431 432 433 434 435 436 437 438
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with query coordinator not healthy")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
		}, err
	}

439
	collectionID := req.CollectionID
X
xige-16 已提交
440 441 442 443 444 445 446 447 448
	queryChannel, queryResultChannel, err := qc.meta.GetQueryChannel(collectionID)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with error")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
		}, err
	}
449 450

	return &querypb.CreateQueryChannelResponse{
451
		Status:         status,
452 453 454 455 456
		RequestChannel: queryChannel,
		ResultChannel:  queryResultChannel,
	}, nil
}

457
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
458
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
459 460 461 462 463 464 465 466 467 468 469 470 471
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getPartitionStates end with query coordinator not healthy")
		return &querypb.GetPartitionStatesResponse{
			Status: status,
		}, err
	}

472 473 474
	partitionIDs := req.PartitionIDs
	partitionStates := make([]*querypb.PartitionStates, 0)
	for _, partitionID := range partitionIDs {
475
		res, err := qc.meta.getPartitionStatesByID(req.CollectionID, partitionID)
476
		if err != nil {
477 478
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
479
			return &querypb.GetPartitionStatesResponse{
480
				Status: status,
481 482
			}, err
		}
483 484
		partitionState := &querypb.PartitionStates{
			PartitionID: partitionID,
485
			State:       res.State,
486 487
		}
		partitionStates = append(partitionStates, partitionState)
488 489
	}

490
	return &querypb.GetPartitionStatesResponse{
491
		Status:                status,
492
		PartitionDescriptions: partitionStates,
493 494 495
	}, nil
}

496
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
497 498 499 500 501 502 503 504 505 506 507 508 509
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getSegmentInfo end with query coordinator not healthy")
		return &querypb.GetSegmentInfoResponse{
			Status: status,
		}, err
	}

510 511
	totalMemSize := int64(0)
	totalNumRows := int64(0)
512
	//TODO::get segment infos from MetaReplica
513
	//segmentIDs := req.SegmentIDs
514
	//segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs)
515
	segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
516
	if err != nil {
517 518
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
519
		return &querypb.GetSegmentInfoResponse{
520
			Status: status,
521 522
		}, err
	}
523 524 525
	for _, info := range segmentInfos {
		totalNumRows += info.NumRows
		totalMemSize += info.MemSize
526
	}
527
	log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
528
	return &querypb.GetSegmentInfoResponse{
529 530
		Status: status,
		Infos:  segmentInfos,
531 532
	}, nil
}
533 534 535 536 537 538

func (qc *QueryCoord) isHealthy() bool {
	code := qc.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

539
// GetMetrics returns all the queryCoord's metrics
540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572
func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("QueryCoord.GetMetrics",
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request))

	if !qc.isHealthy() {
		log.Warn("QueryCoord.GetMetrics failed",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(errQueryCoordIsUnhealthy(Params.QueryCoordID)))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgQueryCoordIsUnhealthy(Params.QueryCoordID),
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryCoord.GetMetrics failed to parse metric type",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
X
xige-16 已提交
573
		}, err
574 575 576 577 578 579
	}

	log.Debug("QueryCoord.GetMetrics",
		zap.String("metric_type", metricType))

	if metricType == metricsinfo.SystemInfoMetrics {
580 581 582 583 584 585 586
		ret, err := qc.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

587 588 589 590 591 592 593 594 595
		metrics, err := getSystemInfoMetrics(ctx, req, qc)

		log.Debug("QueryCoord.GetMetrics",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

596 597
		qc.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

598 599
		return metrics, err
	}
X
xige-16 已提交
600 601
	err = errors.New(metricsinfo.MsgUnimplementedMetric)
	log.Debug("QueryCoord.GetMetrics failed",
602 603
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request),
X
xige-16 已提交
604 605
		zap.String("metric_type", metricType),
		zap.Error(err))
606 607 608 609

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
xige-16 已提交
610
			Reason:    err.Error(),
611 612
		},
		Response: "",
X
xige-16 已提交
613
	}, err
614
}