query_coord.go 21.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package querycoord
18

X
xige-16 已提交
19
import (
X
xige-16 已提交
20
	"context"
21
	"errors"
X
xige-16 已提交
22
	"fmt"
23
	"math"
S
sunby 已提交
24
	"math/rand"
25
	"sort"
Z
zhenshan.cao 已提交
26
	"strconv"
27
	"sync"
X
xige-16 已提交
28
	"sync/atomic"
29
	"syscall"
X
xige-16 已提交
30
	"time"
X
xige-16 已提交
31

X
xige-16 已提交
32 33
	"github.com/golang/protobuf/proto"
	"go.etcd.io/etcd/api/v3/mvccpb"
34 35
	"go.uber.org/zap"

36
	"github.com/milvus-io/milvus/internal/allocator"
37
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
X
Xiangyu Wang 已提交
38 39
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
40
	"github.com/milvus-io/milvus/internal/proto/commonpb"
X
Xiangyu Wang 已提交
41
	"github.com/milvus-io/milvus/internal/proto/internalpb"
42
	"github.com/milvus-io/milvus/internal/proto/querypb"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/types"
44
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
45
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
46
	"github.com/milvus-io/milvus/internal/util/sessionutil"
47
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
48
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
xige-16 已提交
49
)
50

X
xige-16 已提交
51 52 53 54
const (
	handoffSegmentPrefix = "querycoord-handoff"
)

55
// Timestamp is an alias for the Int64 type
56 57
type Timestamp = typeutil.Timestamp

58 59 60 61 62
type queryChannelInfo struct {
	requestChannel  string
	responseChannel string
}

63
// QueryCoord is the coordinator of queryNodes
64
type QueryCoord struct {
X
xige-16 已提交
65 66
	loopCtx    context.Context
	loopCancel context.CancelFunc
67 68
	loopWg     sync.WaitGroup
	kvClient   *etcdkv.EtcdKV
X
xige-16 已提交
69

X
xige-16 已提交
70 71
	initOnce sync.Once

72
	queryCoordID uint64
73
	meta         Meta
74
	cluster      Cluster
X
xige-16 已提交
75
	newNodeFn    newQueryNodeFn
76
	scheduler    *TaskScheduler
77
	idAllocator  func() (UniqueID, error)
78
	indexChecker *IndexChecker
X
xige-16 已提交
79

80 81
	metricsCacheManager *metricsinfo.MetricsCacheManager

82 83 84
	dataCoordClient  types.DataCoord
	rootCoordClient  types.RootCoord
	indexCoordClient types.IndexCoord
X
xige-16 已提交
85

86 87
	session   *sessionutil.Session
	eventChan <-chan *sessionutil.SessionEvent
G
godchen 已提交
88

X
xige-16 已提交
89 90
	stateCode  atomic.Value
	enableGrpc bool
G
groot 已提交
91 92

	msFactory msgstream.Factory
X
xige-16 已提交
93 94
}

95
// Register register query service at etcd
96
func (qc *QueryCoord) Register() error {
97 98 99 100 101 102 103 104 105 106 107 108 109
	qc.session.Register()
	go qc.session.LivenessCheck(qc.loopCtx, func() {
		log.Error("Query Coord disconnected from etcd, process will exit", zap.Int64("Server Id", qc.session.ServerID))
		if err := qc.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
		// manually send signal to starter goroutine
		syscall.Kill(syscall.Getpid(), syscall.SIGINT)
	})
	return nil
}

func (qc *QueryCoord) initSession() error {
110
	qc.session = sessionutil.NewSession(qc.loopCtx, Params.MetaRootPath, Params.EtcdEndpoints)
111 112 113
	if qc.session == nil {
		return fmt.Errorf("session is nil, the etcd client connection may have failed")
	}
114
	qc.session.Init(typeutil.QueryCoordRole, Params.Address, true)
115
	Params.NodeID = uint64(qc.session.ServerID)
116
	Params.SetLogger(qc.session.ServerID)
117 118 119
	return nil
}

X
xige-16 已提交
120
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
121
func (qc *QueryCoord) Init() error {
122
	log.Debug("query coord session info", zap.String("metaPath", Params.MetaRootPath), zap.Strings("etcdEndPoints", Params.EtcdEndpoints), zap.String("address", Params.Address))
123 124
	log.Debug("query coordinator start init")
	//connect etcd
125
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
126
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
127 128 129
		if err != nil {
			return err
		}
130
		qc.kvClient = etcdKV
131
		return nil
132
	}
133
	var initError error
X
xige-16 已提交
134
	qc.initOnce.Do(func() {
135 136 137 138 139 140
		err := qc.initSession()
		if err != nil {
			log.Error("QueryCoord init session failed", zap.Error(err))
			initError = err
			return
		}
141
		log.Debug("QueryCoord try to connect etcd")
X
xige-16 已提交
142 143 144 145 146 147
		initError = retry.Do(qc.loopCtx, connectEtcdFn, retry.Attempts(300))
		if initError != nil {
			log.Debug("query coordinator try to connect etcd failed", zap.Error(initError))
			return
		}
		log.Debug("query coordinator try to connect etcd success")
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166

		// init id allocator
		var idAllocatorKV *etcdkv.EtcdKV
		idAllocatorKV, initError = tsoutil.NewTSOKVBase(Params.EtcdEndpoints, Params.KvRootPath, "queryCoordTaskID")
		if initError != nil {
			return
		}
		idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)
		initError = idAllocator.Initialize()
		if initError != nil {
			log.Debug("query coordinator idAllocator initialize failed", zap.Error(initError))
			return
		}
		qc.idAllocator = func() (UniqueID, error) {
			return idAllocator.AllocOne()
		}

		// init meta
		qc.meta, initError = newMeta(qc.loopCtx, qc.kvClient, qc.msFactory, qc.idAllocator)
X
xige-16 已提交
167 168 169 170
		if initError != nil {
			log.Error("query coordinator init meta failed", zap.Error(initError))
			return
		}
171

172
		// init cluster
X
xige-16 已提交
173 174 175 176 177
		qc.cluster, initError = newQueryNodeCluster(qc.loopCtx, qc.meta, qc.kvClient, qc.newNodeFn, qc.session)
		if initError != nil {
			log.Error("query coordinator init cluster failed", zap.Error(initError))
			return
		}
178

179
		// init task scheduler
180
		qc.scheduler, initError = NewTaskScheduler(qc.loopCtx, qc.meta, qc.cluster, qc.kvClient, qc.rootCoordClient, qc.dataCoordClient, qc.indexCoordClient, qc.idAllocator)
X
xige-16 已提交
181 182 183 184
		if initError != nil {
			log.Error("query coordinator init task scheduler failed", zap.Error(initError))
			return
		}
185

186 187 188 189 190 191 192
		// init index checker
		qc.indexChecker, initError = newIndexChecker(qc.loopCtx, qc.kvClient, qc.meta, qc.cluster, qc.scheduler, qc.rootCoordClient, qc.indexCoordClient, qc.dataCoordClient)
		if initError != nil {
			log.Error("query coordinator init index checker failed", zap.Error(initError))
			return
		}

X
xige-16 已提交
193 194
		qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
	})
195
	log.Debug("query coordinator init success")
X
xige-16 已提交
196
	return initError
X
xige-16 已提交
197 198
}

X
xige-16 已提交
199
// Start function starts the goroutines to watch the meta and node updates
200
func (qc *QueryCoord) Start() error {
201 202 203 204 205 206 207 208
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err := qc.msFactory.SetParams(m)
	if err != nil {
		return err
	}
209
	qc.scheduler.Start()
210
	log.Debug("start scheduler ...")
211

212 213 214
	qc.indexChecker.start()
	log.Debug("start index checker ...")

215 216 217
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()

218 219
	qc.loopWg.Add(1)
	go qc.watchNodeLoop()
220

X
xige-16 已提交
221 222 223
	qc.loopWg.Add(1)
	go qc.watchHandoffSegmentLoop()

224 225 226 227
	if Params.AutoBalance {
		qc.loopWg.Add(1)
		go qc.loadBalanceSegmentLoop()
	}
228

229
	qc.UpdateStateCode(internalpb.StateCode_Healthy)
C
congqixia 已提交
230

X
xige-16 已提交
231
	return nil
X
xige-16 已提交
232 233
}

X
xige-16 已提交
234
// Stop function stops watching the meta and node updates
235
func (qc *QueryCoord) Stop() error {
236 237
	qc.UpdateStateCode(internalpb.StateCode_Abnormal)

238
	qc.scheduler.Close()
239
	log.Debug("close scheduler ...")
240 241
	qc.indexChecker.close()
	log.Debug("close index checker ...")
242
	qc.loopCancel()
243

244
	qc.loopWg.Wait()
C
congqixia 已提交
245
	qc.session.Revoke(time.Second)
X
xige-16 已提交
246
	return nil
X
xige-16 已提交
247 248
}

249
// UpdateStateCode updates the status of the coord, including healthy, unhealthy
250 251
func (qc *QueryCoord) UpdateStateCode(code internalpb.StateCode) {
	qc.stateCode.Store(code)
252 253
}

254
// NewQueryCoord creates a QueryCoord object.
255
func NewQueryCoord(ctx context.Context, factory msgstream.Factory) (*QueryCoord, error) {
S
sunby 已提交
256
	rand.Seed(time.Now().UnixNano())
257
	queryChannels := make([]*queryChannelInfo, 0)
Z
zhenshan.cao 已提交
258 259 260 261 262 263 264 265 266 267 268
	channelID := len(queryChannels)
	searchPrefix := Params.SearchChannelPrefix
	searchResultPrefix := Params.SearchResultChannelPrefix
	allocatedQueryChannel := searchPrefix + "-" + strconv.FormatInt(int64(channelID), 10)
	allocatedQueryResultChannel := searchResultPrefix + "-" + strconv.FormatInt(int64(channelID), 10)

	queryChannels = append(queryChannels, &queryChannelInfo{
		requestChannel:  allocatedQueryChannel,
		responseChannel: allocatedQueryResultChannel,
	})

X
xige-16 已提交
269
	ctx1, cancel := context.WithCancel(ctx)
270
	service := &QueryCoord{
271 272 273
		loopCtx:    ctx1,
		loopCancel: cancel,
		msFactory:  factory,
X
xige-16 已提交
274
		newNodeFn:  newQueryNode,
X
xige-16 已提交
275
	}
X
XuanYang-cn 已提交
276

G
godchen 已提交
277
	service.UpdateStateCode(internalpb.StateCode_Abnormal)
278
	log.Debug("query coordinator", zap.Any("queryChannels", queryChannels))
X
xige-16 已提交
279
	return service, nil
280
}
X
xige-16 已提交
281

282
// SetRootCoord sets root coordinator's client
283 284
func (qc *QueryCoord) SetRootCoord(rootCoord types.RootCoord) error {
	if rootCoord == nil {
285
		return errors.New("null RootCoord interface")
286 287
	}

288
	qc.rootCoordClient = rootCoord
289
	return nil
X
xige-16 已提交
290 291
}

292
// SetDataCoord sets data coordinator's client
293 294
func (qc *QueryCoord) SetDataCoord(dataCoord types.DataCoord) error {
	if dataCoord == nil {
295
		return errors.New("null DataCoord interface")
296 297
	}

298
	qc.dataCoordClient = dataCoord
299
	return nil
X
xige-16 已提交
300
}
301

302 303
func (qc *QueryCoord) SetIndexCoord(indexCoord types.IndexCoord) error {
	if indexCoord == nil {
304
		return errors.New("null IndexCoord interface")
305 306 307 308 309 310
	}

	qc.indexCoordClient = indexCoord
	return nil
}

311 312
func (qc *QueryCoord) watchNodeLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
313
	defer cancel()
314
	defer qc.loopWg.Done()
315
	log.Debug("QueryCoord start watch node loop")
316

317 318 319 320 321 322 323 324 325 326 327 328
	offlineNodes, err := qc.cluster.offlineNodes()
	if err == nil {
		offlineNodeIDs := make([]int64, 0)
		for id := range offlineNodes {
			offlineNodeIDs = append(offlineNodeIDs, id)
		}
		loadBalanceSegment := &querypb.LoadBalanceRequest{
			Base: &commonpb.MsgBase{
				MsgType:  commonpb.MsgType_LoadBalanceSegments,
				SourceID: qc.session.ServerID,
			},
			SourceNodeIDs: offlineNodeIDs,
329 330
		}

X
xige-16 已提交
331
		baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown)
332 333
		loadBalanceTask := &loadBalanceTask{
			baseTask:           baseTask,
334 335 336
			LoadBalanceRequest: loadBalanceSegment,
			rootCoord:          qc.rootCoordClient,
			dataCoord:          qc.dataCoordClient,
337
			indexCoord:         qc.indexCoordClient,
338 339
			cluster:            qc.cluster,
			meta:               qc.meta,
340
		}
341 342
		//TODO::deal enqueue error
		qc.scheduler.Enqueue(loadBalanceTask)
343
		log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
344 345
	}

346
	qc.eventChan = qc.session.WatchServices(typeutil.QueryNodeRole, qc.cluster.getSessionVersion()+1, nil)
347 348 349 350
	for {
		select {
		case <-ctx.Done():
			return
351 352 353 354
		case event, ok := <-qc.eventChan:
			if !ok {
				return
			}
355 356 357
			switch event.EventType {
			case sessionutil.SessionAddEvent:
				serverID := event.Session.ServerID
358
				log.Debug("start add a QueryNode to cluster", zap.Any("nodeID", serverID))
359
				err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
360
				if err != nil {
361
					log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
362
				}
363
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
364 365
			case sessionutil.SessionDelEvent:
				serverID := event.Session.ServerID
366
				log.Debug("get a del event after QueryNode down", zap.Int64("nodeID", serverID))
367 368
				nodeExist := qc.cluster.hasNode(serverID)
				if !nodeExist {
369
					log.Error("QueryNode not exist", zap.Int64("nodeID", serverID))
370 371 372 373 374 375 376 377 378 379
					continue
				}

				qc.cluster.stopNode(serverID)
				loadBalanceSegment := &querypb.LoadBalanceRequest{
					Base: &commonpb.MsgBase{
						MsgType:  commonpb.MsgType_LoadBalanceSegments,
						SourceID: qc.session.ServerID,
					},
					SourceNodeIDs: []int64{serverID},
X
xige-16 已提交
380
					BalanceReason: querypb.TriggerCondition_NodeDown,
381 382
				}

X
xige-16 已提交
383
				baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_NodeDown)
384 385
				loadBalanceTask := &loadBalanceTask{
					baseTask:           baseTask,
386 387 388
					LoadBalanceRequest: loadBalanceSegment,
					rootCoord:          qc.rootCoordClient,
					dataCoord:          qc.dataCoordClient,
389
					indexCoord:         qc.indexCoordClient,
390 391
					cluster:            qc.cluster,
					meta:               qc.meta,
392
				}
393
				qc.metricsCacheManager.InvalidateSystemInfoMetrics()
394 395 396
				//TODO:: deal enqueue error
				qc.scheduler.Enqueue(loadBalanceTask)
				log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
397 398 399 400
			}
		}
	}
}
X
xige-16 已提交
401 402 403 404 405 406

func (qc *QueryCoord) watchHandoffSegmentLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)

	defer cancel()
	defer qc.loopWg.Done()
407
	log.Debug("QueryCoord start watch segment loop")
X
xige-16 已提交
408

409
	watchChan := qc.kvClient.WatchWithRevision(handoffSegmentPrefix, qc.indexChecker.revision+1)
X
xige-16 已提交
410 411 412 413 414 415 416 417 418 419 420 421 422 423 424

	for {
		select {
		case <-ctx.Done():
			return
		case resp := <-watchChan:
			for _, event := range resp.Events {
				segmentInfo := &querypb.SegmentInfo{}
				err := proto.Unmarshal(event.Kv.Value, segmentInfo)
				if err != nil {
					log.Error("watchHandoffSegmentLoop: unmarshal failed", zap.Any("error", err.Error()))
					continue
				}
				switch event.Type {
				case mvccpb.PUT:
425 426 427
					if Params.AutoHandoff && qc.indexChecker.verifyHandoffReqValid(segmentInfo) {
						qc.indexChecker.enqueueHandoffReq(segmentInfo)
						log.Debug("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo))
428
					} else {
429
						log.Debug("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
430 431 432 433
						buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
						err = qc.kvClient.Remove(buildQuerySegmentPath)
						if err != nil {
							log.Error("watchHandoffSegmentLoop: remove handoff segment from etcd failed", zap.Error(err))
434
							panic(err)
435
						}
X
xige-16 已提交
436 437 438 439 440 441 442
					}
				default:
					// do nothing
				}
			}
		}
	}
443 444
}

445 446 447 448
func (qc *QueryCoord) loadBalanceSegmentLoop() {
	ctx, cancel := context.WithCancel(qc.loopCtx)
	defer cancel()
	defer qc.loopWg.Done()
449
	log.Debug("QueryCoord start load balance segment loop")
450 451 452 453 454 455 456 457 458 459

	timer := time.NewTicker(time.Duration(Params.BalanceIntervalSeconds) * time.Second)

	for {
		select {
		case <-ctx.Done():
			return
		case <-timer.C:
			onlineNodes, err := qc.cluster.onlineNodes()
			if err != nil {
460
				log.Warn("loadBalanceSegmentLoop: there are no online QueryNode to balance")
461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482
				continue
			}
			// get mem info of online nodes from cluster
			nodeID2MemUsageRate := make(map[int64]float64)
			nodeID2MemUsage := make(map[int64]uint64)
			nodeID2TotalMem := make(map[int64]uint64)
			nodeID2SegmentInfos := make(map[int64]map[UniqueID]*querypb.SegmentInfo)
			onlineNodeIDs := make([]int64, 0)
			for nodeID := range onlineNodes {
				nodeInfo, err := qc.cluster.getNodeInfoByID(nodeID)
				if err != nil {
					log.Warn("loadBalanceSegmentLoop: get node info from query node failed", zap.Int64("nodeID", nodeID), zap.Error(err))
					delete(onlineNodes, nodeID)
					continue
				}

				updateSegmentInfoDone := true
				leastSegmentInfos := make(map[UniqueID]*querypb.SegmentInfo)
				segmentInfos := qc.meta.getSegmentInfosByNode(nodeID)
				for _, segmentInfo := range segmentInfos {
					leastInfo, err := qc.cluster.getSegmentInfoByID(ctx, segmentInfo.SegmentID)
					if err != nil {
483
						log.Warn("loadBalanceSegmentLoop: get segment info from QueryNode failed", zap.Int64("nodeID", nodeID), zap.Error(err))
484 485 486 487 488 489 490 491 492 493 494 495 496 497
						delete(onlineNodes, nodeID)
						updateSegmentInfoDone = false
						break
					}
					leastSegmentInfos[segmentInfo.SegmentID] = leastInfo
				}
				if updateSegmentInfoDone {
					nodeID2MemUsageRate[nodeID] = nodeInfo.(*queryNode).memUsageRate
					nodeID2MemUsage[nodeID] = nodeInfo.(*queryNode).memUsage
					nodeID2TotalMem[nodeID] = nodeInfo.(*queryNode).totalMem
					onlineNodeIDs = append(onlineNodeIDs, nodeID)
					nodeID2SegmentInfos[nodeID] = leastSegmentInfos
				}
			}
498
			log.Debug("loadBalanceSegmentLoop: memory usage rate of all online query node", zap.Any("mem rate", nodeID2MemUsageRate))
499 500 501 502 503 504 505 506 507
			if len(onlineNodeIDs) <= 1 {
				log.Warn("loadBalanceSegmentLoop: there are too few online query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs))
				continue
			}

			// check which nodes need balance and determine which segments on these nodes need to be migrated to other nodes
			memoryInsufficient := false
			loadBalanceTasks := make([]*loadBalanceTask, 0)
			for {
508
				var selectedSegmentInfo *querypb.SegmentInfo
509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528
				sort.Slice(onlineNodeIDs, func(i, j int) bool {
					return nodeID2MemUsageRate[onlineNodeIDs[i]] > nodeID2MemUsageRate[onlineNodeIDs[j]]
				})

				// the memoryUsageRate of the sourceNode is higher than other query node
				sourceNodeID := onlineNodeIDs[0]
				dstNodeID := onlineNodeIDs[len(onlineNodeIDs)-1]
				memUsageRateDiff := nodeID2MemUsageRate[sourceNodeID] - nodeID2MemUsageRate[dstNodeID]
				// if memoryUsageRate of source node is greater then 90%, and the max memUsageDiff is greater than 30%
				// then migrate the segments on source node to other query nodes
				if nodeID2MemUsageRate[sourceNodeID] > Params.OverloadedMemoryThresholdPercentage ||
					memUsageRateDiff > Params.MemoryUsageMaxDifferencePercentage {
					segmentInfos := nodeID2SegmentInfos[sourceNodeID]
					// select the segment that needs balance on the source node
					selectedSegmentInfo, err = chooseSegmentToBalance(sourceNodeID, dstNodeID, segmentInfos, nodeID2MemUsage, nodeID2TotalMem, nodeID2MemUsageRate)
					if err == nil && selectedSegmentInfo != nil {
						req := &querypb.LoadBalanceRequest{
							Base: &commonpb.MsgBase{
								MsgType: commonpb.MsgType_LoadBalanceSegments,
							},
X
xige-16 已提交
529
							BalanceReason:    querypb.TriggerCondition_LoadBalance,
530 531 532 533
							SourceNodeIDs:    []UniqueID{sourceNodeID},
							DstNodeIDs:       []UniqueID{dstNodeID},
							SealedSegmentIDs: []UniqueID{selectedSegmentInfo.SegmentID},
						}
X
xige-16 已提交
534
						baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_LoadBalance)
535 536 537 538 539
						balanceTask := &loadBalanceTask{
							baseTask:           baseTask,
							LoadBalanceRequest: req,
							rootCoord:          qc.rootCoordClient,
							dataCoord:          qc.dataCoordClient,
540
							indexCoord:         qc.indexCoordClient,
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580
							cluster:            qc.cluster,
							meta:               qc.meta,
						}
						loadBalanceTasks = append(loadBalanceTasks, balanceTask)
						nodeID2MemUsage[sourceNodeID] -= uint64(selectedSegmentInfo.MemSize)
						nodeID2MemUsage[dstNodeID] += uint64(selectedSegmentInfo.MemSize)
						nodeID2MemUsageRate[sourceNodeID] = float64(nodeID2MemUsage[sourceNodeID]) / float64(nodeID2TotalMem[sourceNodeID])
						nodeID2MemUsageRate[dstNodeID] = float64(nodeID2MemUsage[dstNodeID]) / float64(nodeID2TotalMem[dstNodeID])
						delete(nodeID2SegmentInfos[sourceNodeID], selectedSegmentInfo.SegmentID)
						nodeID2SegmentInfos[dstNodeID][selectedSegmentInfo.SegmentID] = selectedSegmentInfo
						continue
					}
				}
				if err != nil {
					// no enough memory on query nodes to balance, then notify proxy to stop insert
					memoryInsufficient = true
				}
				// if memoryInsufficient == false
				// all query node's memoryUsageRate is less than 90%, and the max memUsageDiff is less than 30%
				// this balance loop is done
				break
			}
			if !memoryInsufficient {
				for _, t := range loadBalanceTasks {
					qc.scheduler.Enqueue(t)
					log.Debug("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t))
					err = t.waitToFinish()
					if err != nil {
						// if failed, wait for next balance loop
						// it may be that the collection/partition of the balanced segment has been released
						// it also may be other abnormal errors
						log.Error("loadBalanceSegmentLoop: balance task execute failed", zap.Any("task", t))
					} else {
						log.Debug("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t))
					}
				}
				log.Debug("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks))
			} else {
				// no enough memory on query nodes to balance, then notify proxy to stop insert
				//TODO:: xige-16
581
				log.Error("loadBalanceSegmentLoop: QueryNode has insufficient memory, stop inserting data")
582 583 584 585 586 587 588 589 590 591 592 593
			}
		}
	}
}

func chooseSegmentToBalance(sourceNodeID int64, dstNodeID int64,
	segmentInfos map[UniqueID]*querypb.SegmentInfo,
	nodeID2MemUsage map[int64]uint64,
	nodeID2TotalMem map[int64]uint64,
	nodeID2MemUsageRate map[int64]float64) (*querypb.SegmentInfo, error) {
	memoryInsufficient := true
	minMemDiffPercentage := 1.0
594
	var selectedSegmentInfo *querypb.SegmentInfo
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615
	for _, info := range segmentInfos {
		dstNodeMemUsageAfterBalance := nodeID2MemUsage[dstNodeID] + uint64(info.MemSize)
		dstNodeMemUsageRateAfterBalance := float64(dstNodeMemUsageAfterBalance) / float64(nodeID2TotalMem[dstNodeID])
		// if memUsageRate of dstNode is greater than OverloadedMemoryThresholdPercentage after balance, than can't balance
		if dstNodeMemUsageRateAfterBalance < Params.OverloadedMemoryThresholdPercentage {
			memoryInsufficient = false
			sourceNodeMemUsageAfterBalance := nodeID2MemUsage[sourceNodeID] - uint64(info.MemSize)
			sourceNodeMemUsageRateAfterBalance := float64(sourceNodeMemUsageAfterBalance) / float64(nodeID2TotalMem[sourceNodeID])
			// assume all query node has same memory capacity
			// if the memUsageRateDiff between the two nodes does not become smaller after balance, there is no need for balance
			diffBeforBalance := nodeID2MemUsageRate[sourceNodeID] - nodeID2MemUsageRate[dstNodeID]
			diffAfterBalance := dstNodeMemUsageRateAfterBalance - sourceNodeMemUsageRateAfterBalance
			if diffAfterBalance < diffBeforBalance {
				if math.Abs(diffAfterBalance) < minMemDiffPercentage {
					selectedSegmentInfo = info
				}
			}
		}
	}

	if memoryInsufficient {
616
		return nil, errors.New("all QueryNode has insufficient memory")
617 618 619 620
	}

	return selectedSegmentInfo, nil
}