query_coord.go 21.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package querycoord
18

X
xige-16 已提交
19
import (
X
xige-16 已提交
20
	"context"
21
	"errors"
22 23
	"math"
	"sort"
24
	"syscall"
25

X
xige-16 已提交
26
	"fmt"
S
sunby 已提交
27
	"math/rand"
Z
zhenshan.cao 已提交
28
	"strconv"
29
	"sync"
X
xige-16 已提交
30
	"sync/atomic"
X
xige-16 已提交
31
	"time"
X
xige-16 已提交
32

X
xige-16 已提交
33 34
	"github.com/golang/protobuf/proto"
	"go.etcd.io/etcd/api/v3/mvccpb"
35 36
	"go.uber.org/zap"

37
	"github.com/milvus-io/milvus/internal/allocator"
38
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
41
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/proto/internalpb"
43
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
44
	"github.com/milvus-io/milvus/internal/types"
45
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
46
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
47
	"github.com/milvus-io/milvus/internal/util/sessionutil"
48
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
49
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
50
)
51

X
xige-16 已提交
52 53 54 55
const (
	handoffSegmentPrefix = "querycoord-handoff"
)

56
// Timestamp is an alias for the Int64 type
57 58
type Timestamp = typeutil.Timestamp

59 60 61 62 63
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

64
// QueryCoord is the coordinator of queryNodes
65
type QueryCoord struct {
X
xige-16 已提交
66 67
	loopCtx    context.Context
	loopCancel context.CancelFunc
68 69
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
70

X
xige-16 已提交
71 72
	initOnce sync.Once

73
	queryCoordID uint64
74
	meta         Meta
75
	cluster      Cluster
X
xige-16 已提交
76
	newNodeFn    newQueryNodeFn
77
	scheduler    *TaskScheduler
78
	idAllocator  func() (UniqueID, error)
79
	indexChecker *IndexChecker
X
xige-16 已提交
80

81 82
	metricsCacheManager *metricsinfo.MetricsCacheManager

83 84 85
	dataCoordClient  types.DataCoord
	rootCoordClient  types.RootCoord
	indexCoordClient types.IndexCoord
X
xige-16 已提交
86

87 88
	session   *sessionutil.Session
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
89

X
xige-16 已提交
90 91
	stateCode  atomic.Value
	enableGrpc bool
G
groot 已提交
92 93

	msFactory msgstream.Factory
X
xige-16 已提交
94 95
}

96
// Register register query service at etcd
97
func (qc *QueryCoord) Register() error {
98
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
99
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
100
	qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
101
	Params.NodeID = uint64(qc.session.ServerID)
X
Xiaofan 已提交
102
	Params.SetLogger(typeutil.UniqueID(-1))
103 104 105
	return nil
}

X
xige-16 已提交
106
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
107
func (qc *QueryCoord) Init() error {
108 109
	log.Debug("query coordinator start init")
	//connect etcd
110
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
111
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
112 113 114
		if err != nil {
			return err
		}
115
		qc.kvClient = etcdKV
116
		return nil
117
	}
X
xige-16 已提交
118 119 120 121 122 123 124 125 126
	var initError error = nil
	qc.initOnce.Do(func() {
		log.Debug("query coordinator try to connect etcd")
		initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
		if initError != nil {
			log.Debug("query coordinator try to connect etcd failed", zap.Error(initError))
			return
		}
		log.Debug("query coordinator try to connect etcd success")
127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145

		// init id allocator
		var idAllocatorKV *etcdkv.EtcdKV
		idAllocatorKV, initError = tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "queryCoordTaskID")
		if initError != nil {
			return
		}
		idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)
		initError = idAllocator.Initialize()
		if initError != nil {
			log.Debug("query coordinator idAllocator initialize failed", zap.Error(initError))
			return
		}
		qc.idAllocator = func() (UniqueID, error) {
			return idAllocator.AllocOne()
		}

		// init meta
		qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.msFactory, qc.idAllocator)
X
xige-16 已提交
146 147 148 149
		if initError != nil {
			log.Error("query coordinator init meta failed", zap.Error(initError))
			return
		}
150

151
		// init cluster
X
xige-16 已提交
152 153 154 155 156
		qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session)
		if initError != nil {
			log.Error("query coordinator init cluster failed", zap.Error(initError))
			return
		}
157

158
		// init task scheduler
159
		qc.scheduler, initError = NewTaskScheduler(qc.loopCtx, qc.meta, qc.cluster, qc.kvClient, qc.rootCoordClient, qc.dataCoordClient, qc.indexCoordClient, qc.idAllocator)
X
xige-16 已提交
160 161 162 163
		if initError != nil {
			log.Error("query coordinator init task scheduler failed", zap.Error(initError))
			return
		}
164

165 166 167 168 169 170 171
		// init index checker
		qc.indexChecker, initError = newIndexChecker(qc.loopCtx, qc.kvClient, qc.meta, qc.cluster, qc.scheduler, qc.rootCoordClient, qc.indexCoordClient, qc.dataCoordClient)
		if initError != nil {
			log.Error("query coordinator init index checker failed", zap.Error(initError))
			return
		}

X
xige-16 已提交
172 173
		qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	})
174
	log.Debug("query coordinator init success")
X
xige-16 已提交
175
	return initError
X
xige-16 已提交
176 177
}

X
xige-16 已提交
178
// Start function starts the goroutines to watch the meta and node updates
179
func (qc *QueryCoord) Start() error {
180 181 182 183 184 185 186 187
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err := qc.msFactory.SetParams(m)
	if err != nil {
		return err
	}
188
	qc.scheduler.Start()
189
	log.Debug("start scheduler ...")
190

191 192 193
	qc.indexChecker.start()
	log.Debug("start index checker ...")

194 195 196
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()

197
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
198

199 200
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
201

X
xige-16 已提交
202 203 204
	qc.loopWg.Add(1)
	go qc.watchHandoffSegmentLoop()

205 206 207 208
	if Params.AutoBalance {
		qc.loopWg.Add(1)
		go qc.loadBalanceSegmentLoop()
	}
209

210
	go qc.session.LivenessCheck(qc.loopCtx, func() {
211
		log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID))
X
Xiaofan 已提交
212 213 214
		if err := qc.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
215 216
		// manually send signal to starter goroutine
		syscall.Kill(syscall.Getpid(), syscall.SIGINT)
C
congqixia 已提交
217 218
	})

X
xige-16 已提交
219
	return nil
X
xige-16 已提交
220 221
}

X
xige-16 已提交
222
// Stop function stops watching the meta and node updates
223
func (qc *QueryCoord) Stop() error {
224 225
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)

226
	qc.scheduler.Close()
227
	log.Debug("close scheduler ...")
228 229
	qc.indexChecker.close()
	log.Debug("close index checker ...")
230
	qc.loopCancel()
231

232
	qc.loopWg.Wait()
C
congqixia 已提交
233
	qc.session.Revoke(time.Second)
X
xige-16 已提交
234
	return nil
X
xige-16 已提交
235 236
}

237
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
238 239
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
240 241
}

242
// NewQueryCoord creates a QueryCoord object.
243
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
244
	rand.Seed(time.Now().UnixNano())
245
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
246 247 248 249 250 251 252 253 254 255 256
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
257
	ctx1, cancel := context.WithCancel(ctx)
258
	service := &QueryCoord{
259 260 261
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
262
		newNodeFn:  newQueryNode,
X
xige-16 已提交
263
	}
X
XuanYang-cn 已提交
264

G
godchen 已提交
265
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
266
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
267
	return service, nil
268
}
X
xige-16 已提交
269

270
// SetRootCoord sets root coordinator's client
271 272 273 274 275
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) error {
	if rootCoord == nil {
		return errors.New("null root coordinator interface")
	}

276
	qc.rootCoordClient = rootCoord
277
	return nil
X
xige-16 已提交
278 279
}

280
// SetDataCoord sets data coordinator's client
281 282 283 284 285
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) error {
	if dataCoord == nil {
		return errors.New("null data coordinator interface")
	}

286
	qc.dataCoordClient = dataCoord
287
	return nil
X
xige-16 已提交
288
}
289

290 291 292 293 294 295 296 297 298
func (qc *QueryCoord) SetIndexCoord(indexCoord types.IndexCoord) error {
	if indexCoord == nil {
		return errors.New("null index coordinator interface")
	}

	qc.indexCoordClient = indexCoord
	return nil
}

299 300
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
301
	defer cancel()
302 303
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch node loop")
304

305 306 307 308 309 310 311 312 313 314 315 316
	offlineNodes, err := qc.cluster.offlineNodes()
	if err == nil {
		offlineNodeIDs := make([]int64, 0)
		for id := range offlineNodes {
			offlineNodeIDs = append(offlineNodeIDs, id)
		}
		loadBalanceSegment := &querypb.LoadBalanceRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_LoadBalanceSegments,
				SourceID: qc.session.ServerID,
			},
			SourceNodeIDs: offlineNodeIDs,
317 318
		}

319
		baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown)
320 321
		loadBalanceTask := &loadBalanceTask{
			baseTask:           baseTask,
322 323 324
			LoadBalanceRequest: loadBalanceSegment,
			rootCoord:          qc.rootCoordClient,
			dataCoord:          qc.dataCoordClient,
325
			indexCoord:         qc.indexCoordClient,
326 327
			cluster:            qc.cluster,
			meta:               qc.meta,
328
		}
329 330
		//TODO::deal enqueue error
		qc.scheduler.Enqueue(loadBalanceTask)
331
		log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
332 333
	}

334
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil)
335 336 337 338
	for {
		select {
		case <-ctx.Done():
			return
339 340 341 342
		case event, ok := <-qc.eventChan:
			if !ok {
				return
			}
343 344 345
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
346
				log.Debug("start add a queryNode to cluster", zap.Any("nodeID", serverID))
347
				err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
348 349 350
				if err != nil {
					log.Error("query node failed to register", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
				}
351
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
352 353
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
354
				log.Debug("get a del event after queryNode down", zap.Int64("nodeID", serverID))
355 356
				nodeExist := qc.cluster.hasNode(serverID)
				if !nodeExist {
357 358 359 360 361 362 363 364 365 366 367 368 369 370
					log.Error("queryNode not exist", zap.Int64("nodeID", serverID))
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
					BalanceReason: querypb.TriggerCondition_nodeDown,
				}

371
				baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_nodeDown)
372 373
				loadBalanceTask := &loadBalanceTask{
					baseTask:           baseTask,
374 375 376
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
377
					indexCoord:         qc.indexCoordClient,
378 379
					cluster:            qc.cluster,
					meta:               qc.meta,
380
				}
381
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
382 383 384
				//TODO:: deal enqueue error
				qc.scheduler.Enqueue(loadBalanceTask)
				log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
385 386 387 388
			}
		}
	}
}
X
xige-16 已提交
389 390 391 392 393 394 395 396

func (qc *QueryCoord) watchHandoffSegmentLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)

	defer cancel()
	defer qc.loopWg.Done()
	log.Debug("query coordinator start watch segment loop")

397
	watchChan := qc.kvClient.WatchWithRevision(handoffSegmentPrefix, qc.indexChecker.revision+1)
X
xige-16 已提交
398 399 400 401 402 403 404 405 406 407 408 409 410 411 412

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
			for _, event := range resp.Events {
				segmentInfo := &querypb.SegmentInfo{}
				err := proto.Unmarshal(event.Kv.Value, segmentInfo)
				if err != nil {
					log.Error("watchHandoffSegmentLoop: unmarshal failed", zap.Any("error", err.Error()))
					continue
				}
				switch event.Type {
				case mvccpb.PUT:
413 414 415
					if Params.AutoHandoff && qc.indexChecker.verifyHandoffReqValid(segmentInfo) {
						qc.indexChecker.enqueueHandoffReq(segmentInfo)
						log.Debug("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo))
416
					} else {
417
						log.Debug("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
418 419 420 421
						buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
						err = qc.kvClient.Remove(buildQuerySegmentPath)
						if err != nil {
							log.Error("watchHandoffSegmentLoop: remove handoff segment from etcd failed", zap.Error(err))
422
							panic(err)
423
						}
X
xige-16 已提交
424 425 426 427 428 429 430
					}
				default:
					// do nothing
				}
			}
		}
	}
431 432
}

433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485
func (qc *QueryCoord) loadBalanceSegmentLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
	defer cancel()
	defer qc.loopWg.Done()
	log.Debug("query coordinator start load balance segment loop")

	timer := time.NewTicker(time.Duration(Params.BalanceIntervalSeconds) * time.Second)

	for {
		select {
		case <-ctx.Done():
			return
		case <-timer.C:
			onlineNodes, err := qc.cluster.onlineNodes()
			if err != nil {
				log.Warn("loadBalanceSegmentLoop: there are no online query node to balance")
				continue
			}
			// get mem info of online nodes from cluster
			nodeID2MemUsageRate := make(map[int64]float64)
			nodeID2MemUsage := make(map[int64]uint64)
			nodeID2TotalMem := make(map[int64]uint64)
			nodeID2SegmentInfos := make(map[int64]map[UniqueID]*querypb.SegmentInfo)
			onlineNodeIDs := make([]int64, 0)
			for nodeID := range onlineNodes {
				nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID)
				if err != nil {
					log.Warn("loadBalanceSegmentLoop: get node info from query node failed", zap.Int64("nodeID", nodeID), zap.Error(err))
					delete(onlineNodes, nodeID)
					continue
				}

				updateSegmentInfoDone := true
				leastSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
				segmentInfos := qc.meta.getSegmentInfosByNode(nodeID)
				for _, segmentInfo := range segmentInfos {
					leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID)
					if err != nil {
						log.Warn("loadBalanceSegmentLoop: get segment info from query node failed", zap.Int64("nodeID", nodeID), zap.Error(err))
						delete(onlineNodes, nodeID)
						updateSegmentInfoDone = false
						break
					}
					leastSegmentInfos[segmentInfo.SegmentID] = leastInfo
				}
				if updateSegmentInfoDone {
					nodeID2MemUsageRate[nodeID] = nodeInfo.(*queryNode).memUsageRate
					nodeID2MemUsage[nodeID] = nodeInfo.(*queryNode).memUsage
					nodeID2TotalMem[nodeID] = nodeInfo.(*queryNode).totalMem
					onlineNodeIDs = append(onlineNodeIDs, nodeID)
					nodeID2SegmentInfos[nodeID] = leastSegmentInfos
				}
			}
486
			log.Debug("loadBalanceSegmentLoop: memory usage rate of all online query node", zap.Any("mem rate", nodeID2MemUsageRate))
487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527
			if len(onlineNodeIDs) <= 1 {
				log.Warn("loadBalanceSegmentLoop: there are too few online query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs))
				continue
			}

			// check which nodes need balance and determine which segments on these nodes need to be migrated to other nodes
			memoryInsufficient := false
			loadBalanceTasks := make([]*loadBalanceTask, 0)
			for {
				var selectedSegmentInfo *querypb.SegmentInfo = nil
				sort.Slice(onlineNodeIDs, func(i, j int) bool {
					return nodeID2MemUsageRate[onlineNodeIDs[i]] > nodeID2MemUsageRate[onlineNodeIDs[j]]
				})

				// the memoryUsageRate of the sourceNode is higher than other query node
				sourceNodeID := onlineNodeIDs[0]
				dstNodeID := onlineNodeIDs[len(onlineNodeIDs)-1]
				memUsageRateDiff := nodeID2MemUsageRate[sourceNodeID] - nodeID2MemUsageRate[dstNodeID]
				// if memoryUsageRate of source node is greater then 90%, and the max memUsageDiff is greater than 30%
				// then migrate the segments on source node to other query nodes
				if nodeID2MemUsageRate[sourceNodeID] > Params.OverloadedMemoryThresholdPercentage ||
					memUsageRateDiff > Params.MemoryUsageMaxDifferencePercentage {
					segmentInfos := nodeID2SegmentInfos[sourceNodeID]
					// select the segment that needs balance on the source node
					selectedSegmentInfo, err = chooseSegmentToBalance(sourceNodeID, dstNodeID, segmentInfos, nodeID2MemUsage, nodeID2TotalMem, nodeID2MemUsageRate)
					if err == nil && selectedSegmentInfo != nil {
						req := &querypb.LoadBalanceRequest{
							Base: &commonpb.MsgBase{
								MsgType: commonpb.MsgType_LoadBalanceSegments,
							},
							BalanceReason:    querypb.TriggerCondition_loadBalance,
							SourceNodeIDs:    []UniqueID{sourceNodeID},
							DstNodeIDs:       []UniqueID{dstNodeID},
							SealedSegmentIDs: []UniqueID{selectedSegmentInfo.SegmentID},
						}
						baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_loadBalance)
						balanceTask := &loadBalanceTask{
							baseTask:           baseTask,
							LoadBalanceRequest: req,
							rootCoord:          qc.rootCoordClient,
							dataCoord:          qc.dataCoordClient,
528
							indexCoord:         qc.indexCoordClient,
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608
							cluster:            qc.cluster,
							meta:               qc.meta,
						}
						loadBalanceTasks = append(loadBalanceTasks, balanceTask)
						nodeID2MemUsage[sourceNodeID] -= uint64(selectedSegmentInfo.MemSize)
						nodeID2MemUsage[dstNodeID] += uint64(selectedSegmentInfo.MemSize)
						nodeID2MemUsageRate[sourceNodeID] = float64(nodeID2MemUsage[sourceNodeID]) / float64(nodeID2TotalMem[sourceNodeID])
						nodeID2MemUsageRate[dstNodeID] = float64(nodeID2MemUsage[dstNodeID]) / float64(nodeID2TotalMem[dstNodeID])
						delete(nodeID2SegmentInfos[sourceNodeID], selectedSegmentInfo.SegmentID)
						nodeID2SegmentInfos[dstNodeID][selectedSegmentInfo.SegmentID] = selectedSegmentInfo
						continue
					}
				}
				if err != nil {
					// no enough memory on query nodes to balance, then notify proxy to stop insert
					memoryInsufficient = true
				}
				// if memoryInsufficient == false
				// all query node's memoryUsageRate is less than 90%, and the max memUsageDiff is less than 30%
				// this balance loop is done
				break
			}
			if !memoryInsufficient {
				for _, t := range loadBalanceTasks {
					qc.scheduler.Enqueue(t)
					log.Debug("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t))
					err = t.waitToFinish()
					if err != nil {
						// if failed, wait for next balance loop
						// it may be that the collection/partition of the balanced segment has been released
						// it also may be other abnormal errors
						log.Error("loadBalanceSegmentLoop: balance task execute failed", zap.Any("task", t))
					} else {
						log.Debug("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t))
					}
				}
				log.Debug("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks))
			} else {
				// no enough memory on query nodes to balance, then notify proxy to stop insert
				//TODO:: xige-16
				log.Error("loadBalanceSegmentLoop: query node has insufficient memory, stop inserting data")
			}
		}
	}
}

func chooseSegmentToBalance(sourceNodeID int64, dstNodeID int64,
	segmentInfos map[UniqueID]*querypb.SegmentInfo,
	nodeID2MemUsage map[int64]uint64,
	nodeID2TotalMem map[int64]uint64,
	nodeID2MemUsageRate map[int64]float64) (*querypb.SegmentInfo, error) {
	memoryInsufficient := true
	minMemDiffPercentage := 1.0
	var selectedSegmentInfo *querypb.SegmentInfo = nil
	for _, info := range segmentInfos {
		dstNodeMemUsageAfterBalance := nodeID2MemUsage[dstNodeID] + uint64(info.MemSize)
		dstNodeMemUsageRateAfterBalance := float64(dstNodeMemUsageAfterBalance) / float64(nodeID2TotalMem[dstNodeID])
		// if memUsageRate of dstNode is greater than OverloadedMemoryThresholdPercentage after balance, than can't balance
		if dstNodeMemUsageRateAfterBalance < Params.OverloadedMemoryThresholdPercentage {
			memoryInsufficient = false
			sourceNodeMemUsageAfterBalance := nodeID2MemUsage[sourceNodeID] - uint64(info.MemSize)
			sourceNodeMemUsageRateAfterBalance := float64(sourceNodeMemUsageAfterBalance) / float64(nodeID2TotalMem[sourceNodeID])
			// assume all query node has same memory capacity
			// if the memUsageRateDiff between the two nodes does not become smaller after balance, there is no need for balance
			diffBeforBalance := nodeID2MemUsageRate[sourceNodeID] - nodeID2MemUsageRate[dstNodeID]
			diffAfterBalance := dstNodeMemUsageRateAfterBalance - sourceNodeMemUsageRateAfterBalance
			if diffAfterBalance < diffBeforBalance {
				if math.Abs(diffAfterBalance) < minMemDiffPercentage {
					selectedSegmentInfo = info
				}
			}
		}
	}

	if memoryInsufficient {
		return nil, errors.New("all query nodes has insufficient memory")
	}

	return selectedSegmentInfo, nil
}