impl.go 25.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package querycoord
18 19 20 21

import (
	"context"
	"errors"
22
	"fmt"
23

24 25
	"github.com/milvus-io/milvus/internal/common"

26 27
	"go.uber.org/zap"

X
Xiangyu Wang 已提交
28 29 30 31 32
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
xige-16 已提交
33
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
34 35
)

36
// GetComponentStates return information about whether the coord is healthy
37
func (qc *QueryCoord) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
38 39 40 41
	nodeID := common.NotRegisteredID
	if qc.session != nil && qc.session.Registered() {
		nodeID = qc.session.ServerID
	}
42
	serviceComponentInfo := &internalpb.ComponentInfo{
43 44
		// NodeID:    Params.QueryCoordID, // will race with QueryCoord.Register()
		NodeID:    nodeID,
45
		StateCode: qc.stateCode.Load().(internalpb.StateCode),
46
	}
47 48 49 50 51 52 53 54 55 56

	//subComponentInfos, err := qs.cluster.GetComponentInfos(ctx)
	//if err != nil {
	//	return &internalpb.ComponentStates{
	//		Status: &commonpb.Status{
	//			ErrorCode: commonpb.ErrorCode_UnexpectedError,
	//			Reason:    err.Error(),
	//		},
	//	}, err
	//}
57 58 59 60
	return &internalpb.ComponentStates{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
61 62
		State: serviceComponentInfo,
		//SubcomponentStates: subComponentInfos,
63 64 65
	}, nil
}

66 67
// GetTimeTickChannel returns the time tick channel
// TimeTickChannel contains many time tick messages, which has been sent by query nodes
68
func (qc *QueryCoord) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
69 70 71 72 73 74 75 76 77
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.TimeTickChannelName,
	}, nil
}

78 79
// GetStatisticsChannel return the statistics channel
// Statistics channel contains statistics infos of query nodes, such as segment infos, memory infos
80
func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
81 82 83 84 85 86 87 88 89
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
		Value: Params.StatsChannelName,
	}, nil
}

90
// ShowCollections return all the collections that have been loaded
91
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
92
	dbID := req.DbID
93
	log.Debug("show collection start", zap.Int64("dbID", dbID))
94 95 96 97 98 99 100 101 102 103
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show collection end with query coordinator not healthy")
		return &querypb.ShowCollectionsResponse{
			Status: status,
G
godchen 已提交
104
		}, nil
105
	}
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
	collectionInfos := qc.meta.showCollections()
	ID2collectionInfo := make(map[UniqueID]*querypb.CollectionInfo)
	inMemoryCollectionIDs := make([]UniqueID, 0)
	for _, info := range collectionInfos {
		ID2collectionInfo[info.CollectionID] = info
		inMemoryCollectionIDs = append(inMemoryCollectionIDs, info.CollectionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.CollectionIDs) == 0 {
		for _, id := range inMemoryCollectionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
		}
		log.Debug("show collection end", zap.Int64s("collections", inMemoryCollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowCollectionsResponse{
			Status:              status,
			CollectionIDs:       inMemoryCollectionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.CollectionIDs {
		if _, ok := ID2collectionInfo[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
128
			err := fmt.Errorf("collection %d has not been loaded to memory or load failed", id)
129 130 131
			status.Reason = err.Error()
			return &querypb.ShowCollectionsResponse{
				Status: status,
G
godchen 已提交
132
			}, nil
133 134 135 136
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
	}
	log.Debug("show collection end", zap.Int64s("collections", req.CollectionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
137
	return &querypb.ShowCollectionsResponse{
138 139 140
		Status:              status,
		CollectionIDs:       req.CollectionIDs,
		InMemoryPercentages: inMemoryPercentages,
141 142 143
	}, nil
}

144
// LoadCollection loads all the sealed segments of this collection to queryNodes, and assigns watchDmChannelRequest to queryNodes
145
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
146
	collectionID := req.CollectionID
147
	//schema := req.Schema
148
	log.Debug("loadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
149 150
		zap.Stringer("schema", req.Schema))
	status := &commonpb.Status{
151
		ErrorCode: commonpb.ErrorCode_Success,
152
	}
153 154 155 156 157
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load collection end with query coordinator not healthy")
G
godchen 已提交
158
		return status, nil
159
	}
160

161
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
162 163
	loadCollectionTask := &loadCollectionTask{
		baseTask:              baseTask,
164
		LoadCollectionRequest: req,
165 166
		rootCoord:             qc.rootCoordClient,
		dataCoord:             qc.dataCoordClient,
167
		indexCoord:            qc.indexCoordClient,
168 169
		cluster:               qc.cluster,
		meta:                  qc.meta,
170
	}
171 172 173 174
	err := qc.scheduler.Enqueue(loadCollectionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
175
		return status, nil
176
	}
177

178
	err = loadCollectionTask.waitToFinish()
179
	if err != nil {
180
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
181
		status.Reason = err.Error()
G
godchen 已提交
182
		return status, nil
183
	}
184

185
	log.Debug("loadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Any("status", status))
186 187 188
	return status, nil
}

189
// ReleaseCollection clears all data related to this collecion on the querynode
190
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
191
	//dbID := req.DbID
192
	collectionID := req.CollectionID
193
	log.Debug("releaseCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
194 195 196
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
197 198 199 200 201
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release collection end with query coordinator not healthy")
G
godchen 已提交
202
		return status, nil
203 204
	}

205
	hasCollection := qc.meta.hasCollection(collectionID)
206
	if !hasCollection {
X
xige-16 已提交
207
		log.Warn("release collection end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID))
208 209 210
		return status, nil
	}

211
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
212 213
	releaseCollectionTask := &releaseCollectionTask{
		baseTask:                 baseTask,
214
		ReleaseCollectionRequest: req,
215
		cluster:                  qc.cluster,
216 217
		meta:                     qc.meta,
		rootCoord:                qc.rootCoordClient,
218
	}
219 220 221 222
	err := qc.scheduler.Enqueue(releaseCollectionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
223
		return status, nil
224
	}
225

226
	err = releaseCollectionTask.waitToFinish()
227 228 229
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
230
		return status, nil
231 232
	}

233
	log.Debug("releaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
234
	//qc.MetaReplica.printMeta()
235
	//qc.cluster.printMeta()
236 237 238
	return status, nil
}

239
// ShowPartitions return all the partitions that have been loaded
240
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
241
	collectionID := req.CollectionID
242
	log.Debug("show partitions start, ", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs))
243 244 245 246 247 248 249 250 251 252
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("show partition end with query coordinator not healthy")
		return &querypb.ShowPartitionsResponse{
			Status: status,
G
godchen 已提交
253
		}, nil
254 255
	}

256
	partitionStates, err := qc.meta.showPartitions(collectionID)
257
	if err != nil {
258 259
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
260
		return &querypb.ShowPartitionsResponse{
261
			Status: status,
G
godchen 已提交
262
		}, nil
263
	}
264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284
	ID2PartitionState := make(map[UniqueID]*querypb.PartitionStates)
	inMemoryPartitionIDs := make([]UniqueID, 0)
	for _, state := range partitionStates {
		ID2PartitionState[state.PartitionID] = state
		inMemoryPartitionIDs = append(inMemoryPartitionIDs, state.PartitionID)
	}
	inMemoryPercentages := make([]int64, 0)
	if len(req.PartitionIDs) == 0 {
		for _, id := range inMemoryPartitionIDs {
			inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
		}
		log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", inMemoryPartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
		return &querypb.ShowPartitionsResponse{
			Status:              status,
			PartitionIDs:        inMemoryPartitionIDs,
			InMemoryPercentages: inMemoryPercentages,
		}, nil
	}
	for _, id := range req.PartitionIDs {
		if _, ok := ID2PartitionState[id]; !ok {
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
285
			err := errors.New("partition has not been loaded to memory or load failed")
286 287 288
			status.Reason = err.Error()
			return &querypb.ShowPartitionsResponse{
				Status: status,
G
godchen 已提交
289
			}, nil
290 291 292
		}
		inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
	}
293

294
	log.Debug("show partitions end", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", req.PartitionIDs), zap.Int64s("inMemoryPercentage", inMemoryPercentages))
295

296
	return &querypb.ShowPartitionsResponse{
297 298 299
		Status:              status,
		PartitionIDs:        req.PartitionIDs,
		InMemoryPercentages: inMemoryPercentages,
300 301 302
	}, nil
}

303
// LoadPartitions loads all the sealed segments of this partition to queryNodes, and assigns watchDmChannelRequest to queryNodes
304
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
305 306
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
307
	log.Debug("loadPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
308
	status := &commonpb.Status{
309
		ErrorCode: commonpb.ErrorCode_Success,
310
	}
311 312 313 314 315
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("load partition end with query coordinator not healthy")
G
godchen 已提交
316
		return status, nil
317
	}
318 319

	if len(partitionIDs) == 0 {
320
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
321 322
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
323
		log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
G
godchen 已提交
324
		return status, nil
325 326
	}

327 328 329
	hasCollection := qc.meta.hasCollection(collectionID)
	if hasCollection {
		partitionIDsToLoad := make([]UniqueID, 0)
330 331
		loadType, _ := qc.meta.getLoadType(collectionID)
		if loadType == querypb.LoadType_loadCollection {
332 333 334 335 336 337 338 339 340 341 342 343 344
			for _, partitionID := range partitionIDs {
				hasReleasePartition := qc.meta.hasReleasePartition(collectionID, partitionID)
				if hasReleasePartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
		} else {
			for _, partitionID := range partitionIDs {
				hasPartition := qc.meta.hasPartition(collectionID, partitionID)
				if !hasPartition {
					partitionIDsToLoad = append(partitionIDsToLoad, partitionID)
				}
			}
345 346
		}

347
		if len(partitionIDsToLoad) == 0 {
348
			log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
349
			return status, nil
350
		}
351 352 353
		req.PartitionIDs = partitionIDsToLoad
	}

354
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
355 356
	loadPartitionTask := &loadPartitionTask{
		baseTask:              baseTask,
357
		LoadPartitionsRequest: req,
358
		rootCoord:             qc.rootCoordClient,
359
		dataCoord:             qc.dataCoordClient,
360
		indexCoord:            qc.indexCoordClient,
361 362 363
		cluster:               qc.cluster,
		meta:                  qc.meta,
	}
364 365 366 367
	err := qc.scheduler.Enqueue(loadPartitionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
368
		return status, nil
369
	}
370

371
	err = loadPartitionTask.waitToFinish()
372 373 374
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
375
		log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
G
godchen 已提交
376
		return status, nil
377 378
	}

379
	log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
380
	return status, nil
381 382
}

383
// ReleasePartitions clears all data related to this partition on the querynode
384
func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.ReleasePartitionsRequest) (*commonpb.Status, error) {
385
	//dbID := req.DbID
386 387
	collectionID := req.CollectionID
	partitionIDs := req.PartitionIDs
388
	log.Debug("releasePartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
389 390 391
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
392 393 394 395 396
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("release partition end with query coordinator not healthy")
G
godchen 已提交
397
		return status, nil
398 399
	}

400 401
	hasCollection := qc.meta.hasCollection(collectionID)
	if !hasCollection {
402
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
403
		return status, nil
404 405
	}

406 407 408 409
	if len(partitionIDs) == 0 {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("partitionIDs are empty")
		status.Reason = err.Error()
410
		log.Debug("releasePartitionsRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
G
godchen 已提交
411
		return status, nil
412
	}
413

X
xige-16 已提交
414 415 416 417 418 419 420 421
	toReleasedPartitions := make([]UniqueID, 0)
	for _, id := range partitionIDs {
		hasPartition := qc.meta.hasPartition(collectionID, id)
		if hasPartition {
			toReleasedPartitions = append(toReleasedPartitions, id)
		}
	}
	if len(toReleasedPartitions) == 0 {
422
		log.Warn("release partitions end, query coordinator don't have the log of", zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
X
xige-16 已提交
423 424 425 426
		return status, nil
	}

	req.PartitionIDs = toReleasedPartitions
427
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_grpcRequest)
428 429
	releasePartitionTask := &releasePartitionTask{
		baseTask:                 baseTask,
430 431 432
		ReleasePartitionsRequest: req,
		cluster:                  qc.cluster,
	}
433 434 435 436
	err := qc.scheduler.Enqueue(releasePartitionTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
437
		return status, nil
438
	}
439

440
	err = releasePartitionTask.waitToFinish()
441 442 443
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
444
		return status, nil
445
	}
446
	log.Debug("releasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
447
	//qc.MetaReplica.printMeta()
448
	//qc.cluster.printMeta()
449 450 451
	return status, nil
}

452
// CreateQueryChannel assigns unique querychannel and resultchannel to the specified collecion
453
func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.CreateQueryChannelRequest) (*querypb.CreateQueryChannelResponse, error) {
454 455 456 457 458 459 460 461 462 463
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with query coordinator not healthy")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
G
godchen 已提交
464
		}, nil
465 466
	}

467
	collectionID := req.CollectionID
468
	info, err := qc.meta.getQueryChannelInfoByID(collectionID)
X
xige-16 已提交
469 470 471 472 473 474
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		log.Debug("createQueryChannel end with error")
		return &querypb.CreateQueryChannelResponse{
			Status: status,
G
godchen 已提交
475
		}, nil
X
xige-16 已提交
476
	}
477 478

	return &querypb.CreateQueryChannelResponse{
479
		Status:         status,
480 481
		RequestChannel: info.QueryChannelID,
		ResultChannel:  info.QueryResultChannelID,
482 483 484
	}, nil
}

485
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
486
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
487 488 489 490 491 492 493 494 495 496
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getPartitionStates end with query coordinator not healthy")
		return &querypb.GetPartitionStatesResponse{
			Status: status,
G
godchen 已提交
497
		}, nil
498 499
	}

500 501 502
	partitionIDs := req.PartitionIDs
	partitionStates := make([]*querypb.PartitionStates, 0)
	for _, partitionID := range partitionIDs {
503
		res, err := qc.meta.getPartitionStatesByID(req.CollectionID, partitionID)
504
		if err != nil {
505 506
			status.ErrorCode = commonpb.ErrorCode_UnexpectedError
			status.Reason = err.Error()
507
			return &querypb.GetPartitionStatesResponse{
508
				Status: status,
G
godchen 已提交
509
			}, nil
510
		}
511 512
		partitionState := &querypb.PartitionStates{
			PartitionID: partitionID,
513
			State:       res.State,
514 515
		}
		partitionStates = append(partitionStates, partitionState)
516 517
	}

518
	return &querypb.GetPartitionStatesResponse{
519
		Status:                status,
520
		PartitionDescriptions: partitionStates,
521 522 523
	}, nil
}

524
// GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ...
525
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
526 527 528 529 530 531 532 533 534 535
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("getSegmentInfo end with query coordinator not healthy")
		return &querypb.GetSegmentInfoResponse{
			Status: status,
G
godchen 已提交
536
		}, nil
537 538
	}

539 540
	totalMemSize := int64(0)
	totalNumRows := int64(0)
541
	//TODO::get segment infos from MetaReplica
542
	//segmentIDs := req.SegmentIDs
543
	//segmentInfos, err := qs.MetaReplica.getSegmentInfos(segmentIDs)
544
	segmentInfos, err := qc.cluster.getSegmentInfo(ctx, req)
545
	if err != nil {
546 547
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
548
		return &querypb.GetSegmentInfoResponse{
549
			Status: status,
G
godchen 已提交
550
		}, nil
551
	}
552 553 554
	for _, info := range segmentInfos {
		totalNumRows += info.NumRows
		totalMemSize += info.MemSize
555
	}
556
	log.Debug("getSegmentInfo", zap.Int64("num rows", totalNumRows), zap.Int64("memory size", totalMemSize))
557
	return &querypb.GetSegmentInfoResponse{
558 559
		Status: status,
		Infos:  segmentInfos,
560 561
	}, nil
}
562

B
bigsheeper 已提交
563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
// LoadBalance would do a load balancing operation between query nodes
func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
	log.Debug("LoadBalanceRequest received",
		zap.String("role", Params.RoleName),
		zap.Int64("msgID", req.Base.MsgID),
		zap.Any("req", req),
	)
	status := &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_Success,
	}
	if qc.stateCode.Load() != internalpb.StateCode_Healthy {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		err := errors.New("query coordinator is not healthy")
		status.Reason = err.Error()
		log.Debug("LoadBalance failed", zap.Error(err))
		return status, nil
	}

581
	baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_loadBalance)
B
bigsheeper 已提交
582 583 584 585 586
	loadBalanceTask := &loadBalanceTask{
		baseTask:           baseTask,
		LoadBalanceRequest: req,
		rootCoord:          qc.rootCoordClient,
		dataCoord:          qc.dataCoordClient,
587
		indexCoord:         qc.indexCoordClient,
B
bigsheeper 已提交
588 589 590 591 592 593 594 595 596 597 598 599 600 601
		cluster:            qc.cluster,
		meta:               qc.meta,
	}
	err := qc.scheduler.Enqueue(loadBalanceTask)
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
		return status, nil
	}

	err = loadBalanceTask.waitToFinish()
	if err != nil {
		status.ErrorCode = commonpb.ErrorCode_UnexpectedError
		status.Reason = err.Error()
G
godchen 已提交
602
		return status, nil
B
bigsheeper 已提交
603 604 605 606 607 608 609 610 611
	}
	log.Debug("LoadBalanceRequest completed",
		zap.String("role", Params.RoleName),
		zap.Int64("msgID", req.Base.MsgID),
		zap.Any("req", req),
	)
	return status, nil
}

612 613 614 615 616
func (qc *QueryCoord) isHealthy() bool {
	code := qc.stateCode.Load().(internalpb.StateCode)
	return code == internalpb.StateCode_Healthy
}

617
// GetMetrics returns all the queryCoord's metrics
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650
func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
	log.Debug("QueryCoord.GetMetrics",
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request))

	if !qc.isHealthy() {
		log.Warn("QueryCoord.GetMetrics failed",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(errQueryCoordIsUnhealthy(Params.QueryCoordID)))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    msgQueryCoordIsUnhealthy(Params.QueryCoordID),
			},
			Response: "",
		}, nil
	}

	metricType, err := metricsinfo.ParseMetricType(req.Request)
	if err != nil {
		log.Warn("QueryCoord.GetMetrics failed to parse metric type",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.Error(err))

		return &milvuspb.GetMetricsResponse{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UnexpectedError,
				Reason:    err.Error(),
			},
			Response: "",
G
godchen 已提交
651
		}, nil
652 653 654 655 656 657
	}

	log.Debug("QueryCoord.GetMetrics",
		zap.String("metric_type", metricType))

	if metricType == metricsinfo.SystemInfoMetrics {
658 659 660 661 662 663 664
		ret, err := qc.metricsCacheManager.GetSystemInfoMetrics()
		if err == nil && ret != nil {
			return ret, nil
		}
		log.Debug("failed to get system info metrics from cache, recompute instead",
			zap.Error(err))

665 666 667 668 669 670 671 672 673
		metrics, err := getSystemInfoMetrics(ctx, req, qc)

		log.Debug("QueryCoord.GetMetrics",
			zap.Int64("node_id", Params.QueryCoordID),
			zap.String("req", req.Request),
			zap.String("metric_type", metricType),
			zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
			zap.Error(err))

674 675
		qc.metricsCacheManager.UpdateSystemInfoMetrics(metrics)

G
godchen 已提交
676
		return metrics, nil
677
	}
X
xige-16 已提交
678 679
	err = errors.New(metricsinfo.MsgUnimplementedMetric)
	log.Debug("QueryCoord.GetMetrics failed",
680 681
		zap.Int64("node_id", Params.QueryCoordID),
		zap.String("req", req.Request),
X
xige-16 已提交
682 683
		zap.String("metric_type", metricType),
		zap.Error(err))
684 685 686 687

	return &milvuspb.GetMetricsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UnexpectedError,
X
xige-16 已提交
688
			Reason:    err.Error(),
689 690
		},
		Response: "",
G
godchen 已提交
691
	}, nil
692
}