server.go 21.2 KB
Newer Older
Y
yah01 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

B
Bingyi Sun 已提交
17 18 19 20 21 22 23 24
package querycoordv2

import (
	"context"
	"fmt"
	"os"
	"sync"
	"syscall"
W
wei liu 已提交
25
	"time"
B
Bingyi Sun 已提交
26

27
	"github.com/cockroachdb/errors"
28 29 30 31
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/atomic"
	"go.uber.org/zap"
	"golang.org/x/sync/errgroup"
32

S
SimFG 已提交
33 34
	"github.com/milvus-io/milvus-proto/go-api/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/milvuspb"
B
Bingyi Sun 已提交
35 36 37 38 39 40 41 42 43 44 45 46
	"github.com/milvus-io/milvus/internal/allocator"
	"github.com/milvus-io/milvus/internal/kv"
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/querycoordv2/balance"
	"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
	"github.com/milvus-io/milvus/internal/querycoordv2/dist"
	"github.com/milvus-io/milvus/internal/querycoordv2/job"
	"github.com/milvus-io/milvus/internal/querycoordv2/meta"
	"github.com/milvus-io/milvus/internal/querycoordv2/observers"
	"github.com/milvus-io/milvus/internal/querycoordv2/params"
	"github.com/milvus-io/milvus/internal/querycoordv2/session"
	"github.com/milvus-io/milvus/internal/querycoordv2/task"
47
	"github.com/milvus-io/milvus/internal/querycoordv2/utils"
B
Bingyi Sun 已提交
48 49 50
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/sessionutil"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
51 52 53 54 55 56 57
	"github.com/milvus-io/milvus/pkg/common"
	"github.com/milvus-io/milvus/pkg/log"
	"github.com/milvus-io/milvus/pkg/metrics"
	"github.com/milvus-io/milvus/pkg/util/metricsinfo"
	"github.com/milvus-io/milvus/pkg/util/paramtable"
	"github.com/milvus-io/milvus/pkg/util/timerecord"
	"github.com/milvus-io/milvus/pkg/util/typeutil"
B
Bingyi Sun 已提交
58 59 60 61
)

var (
	// Only for re-export
E
Enwei Jiao 已提交
62
	Params = params.Params
B
Bingyi Sun 已提交
63 64 65 66 67 68
)

type Server struct {
	ctx                 context.Context
	cancel              context.CancelFunc
	wg                  sync.WaitGroup
69
	status              atomic.Int32
B
Bingyi Sun 已提交
70
	etcdCli             *clientv3.Client
E
Enwei Jiao 已提交
71
	address             string
B
Bingyi Sun 已提交
72 73 74 75 76 77
	session             *sessionutil.Session
	kv                  kv.MetaKv
	idAllocator         func() (int64, error)
	metricsCacheManager *metricsinfo.MetricsCacheManager

	// Coordinators
C
cai.zhang 已提交
78 79
	dataCoord types.DataCoord
	rootCoord types.RootCoord
B
Bingyi Sun 已提交
80 81 82 83 84 85 86 87 88

	// Meta
	store     meta.Store
	meta      *meta.Meta
	dist      *meta.DistributionManager
	targetMgr *meta.TargetManager
	broker    meta.Broker

	// Session
W
wayblink 已提交
89 90 91
	cluster          session.Cluster
	nodeMgr          *session.NodeManager
	queryNodeCreator session.QueryNodeCreator
B
Bingyi Sun 已提交
92 93 94 95 96 97

	// Schedulers
	jobScheduler  *job.Scheduler
	taskScheduler task.Scheduler

	// HeartBeat
98
	distController dist.Controller
B
Bingyi Sun 已提交
99 100 101 102 103 104 105

	// Checkers
	checkerController *checkers.CheckerController

	// Observers
	collectionObserver *observers.CollectionObserver
	leaderObserver     *observers.LeaderObserver
W
wei liu 已提交
106
	targetObserver     *observers.TargetObserver
W
wei liu 已提交
107 108
	replicaObserver    *observers.ReplicaObserver
	resourceObserver   *observers.ResourceObserver
B
Bingyi Sun 已提交
109

110 111
	balancer    balance.Balance
	balancerMap map[string]balance.Balance
112 113 114

	// Active-standby
	enableActiveStandBy bool
115
	activateFunc        func() error
W
wei liu 已提交
116 117

	nodeUpEventChan chan int64
W
wei liu 已提交
118
	notifyNodeUp    chan struct{}
B
Bingyi Sun 已提交
119 120
}

121
func NewQueryCoord(ctx context.Context) (*Server, error) {
B
Bingyi Sun 已提交
122 123
	ctx, cancel := context.WithCancel(ctx)
	server := &Server{
W
wei liu 已提交
124 125 126
		ctx:             ctx,
		cancel:          cancel,
		nodeUpEventChan: make(chan int64, 10240),
W
wei liu 已提交
127
		notifyNodeUp:    make(chan struct{}),
B
Bingyi Sun 已提交
128
	}
129
	server.UpdateStateCode(commonpb.StateCode_Abnormal)
W
wayblink 已提交
130
	server.queryNodeCreator = session.DefaultQueryNodeCreator
B
Bingyi Sun 已提交
131 132 133 134 135
	return server, nil
}

func (s *Server) Register() error {
	s.session.Register()
136
	if s.enableActiveStandBy {
137 138 139 140
		if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil {
			log.Error("failed to activate standby server", zap.Error(err))
			return err
		}
141
	}
142 143 144

	s.session.LivenessCheck(s.ctx, func() {
		log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.ServerID))
B
Bingyi Sun 已提交
145 146 147 148 149 150 151 152 153 154 155 156 157
		if err := s.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
		// manually send signal to starter goroutine
		if s.session.TriggerKill {
			if p, err := os.FindProcess(os.Getpid()); err == nil {
				p.Signal(syscall.SIGINT)
			}
		}
	})
	return nil
}

158
func (s *Server) initSession() error {
B
Bingyi Sun 已提交
159
	// Init QueryCoord session
160
	s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
B
Bingyi Sun 已提交
161 162 163
	if s.session == nil {
		return fmt.Errorf("failed to create session")
	}
E
Enwei Jiao 已提交
164
	s.session.Init(typeutil.QueryCoordRole, s.address, true, true)
165
	s.enableActiveStandBy = Params.QueryCoordCfg.EnableActiveStandby.GetAsBool()
166
	s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
167 168 169 170 171 172 173 174 175 176 177 178 179
	return nil
}

func (s *Server) Init() error {
	log.Info("QueryCoord start init",
		zap.String("meta-root-path", Params.EtcdCfg.MetaRootPath.GetValue()),
		zap.String("address", s.address))

	if err := s.initSession(); err != nil {
		return err
	}

	if s.enableActiveStandBy {
180
		s.activateFunc = func() error {
181 182
			log.Info("QueryCoord switch from standby to active, activating")
			if err := s.initQueryCoord(); err != nil {
183 184
				log.Error("QueryCoord init failed", zap.Error(err))
				return err
185 186
			}
			if err := s.startQueryCoord(); err != nil {
187 188
				log.Error("QueryCoord init failed", zap.Error(err))
				return err
189 190
			}
			log.Info("QueryCoord startup success")
191
			return nil
192 193 194 195 196
		}
		s.UpdateStateCode(commonpb.StateCode_StandBy)
		log.Info("QueryCoord enter standby mode successfully")
		return nil
	}
B
Bingyi Sun 已提交
197

198 199 200 201
	return s.initQueryCoord()
}

func (s *Server) initQueryCoord() error {
202 203
	s.UpdateStateCode(commonpb.StateCode_Initializing)
	log.Info("QueryCoord", zap.Any("State", commonpb.StateCode_Initializing))
B
Bingyi Sun 已提交
204
	// Init KV
205
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
B
Bingyi Sun 已提交
206
	s.kv = etcdKV
X
Xiaofan 已提交
207
	log.Info("query coordinator try to connect etcd success")
B
Bingyi Sun 已提交
208 209

	// Init ID allocator
210
	idAllocatorKV := tsoutil.NewTSOKVBase(s.etcdCli, Params.EtcdCfg.KvRootPath.GetValue(), "querycoord-id-allocator")
B
Bingyi Sun 已提交
211 212 213 214 215 216 217 218 219 220 221 222 223 224
	idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)
	err := idAllocator.Initialize()
	if err != nil {
		log.Error("query coordinator id allocator initialize failed", zap.Error(err))
		return err
	}
	s.idAllocator = func() (int64, error) {
		return idAllocator.AllocOne()
	}

	// Init metrics cache manager
	s.metricsCacheManager = metricsinfo.NewMetricsCacheManager()

	// Init meta
W
wei liu 已提交
225
	s.nodeMgr = session.NewNodeManager()
B
Bingyi Sun 已提交
226 227 228 229 230
	err = s.initMeta()
	if err != nil {
		return err
	}
	// Init session
X
Xiaofan 已提交
231
	log.Info("init session")
W
wayblink 已提交
232
	s.cluster = session.NewCluster(s.nodeMgr, s.queryNodeCreator)
B
Bingyi Sun 已提交
233 234

	// Init schedulers
X
Xiaofan 已提交
235
	log.Info("init schedulers")
B
Bingyi Sun 已提交
236 237 238 239 240 241 242 243 244 245 246 247
	s.jobScheduler = job.NewScheduler()
	s.taskScheduler = task.NewScheduler(
		s.ctx,
		s.meta,
		s.dist,
		s.targetMgr,
		s.broker,
		s.cluster,
		s.nodeMgr,
	)

	// Init heartbeat
X
Xiaofan 已提交
248
	log.Info("init dist controller")
B
Bingyi Sun 已提交
249 250 251 252 253 254 255 256
	s.distController = dist.NewDistController(
		s.cluster,
		s.nodeMgr,
		s.dist,
		s.targetMgr,
		s.taskScheduler,
	)

257 258 259 260 261 262 263 264 265 266 267 268 269 270 271
	// Init balancer map and balancer
	log.Info("init all available balancer")
	s.balancerMap = make(map[string]balance.Balance)
	s.balancerMap[balance.RoundRobinBalancerName] = balance.NewRoundRobinBalancer(s.taskScheduler, s.nodeMgr)
	s.balancerMap[balance.RowCountBasedBalancerName] = balance.NewRowCountBasedBalancer(s.taskScheduler,
		s.nodeMgr, s.dist, s.meta, s.targetMgr)
	s.balancerMap[balance.ScoreBasedBalancerName] = balance.NewScoreBasedBalancer(s.taskScheduler,
		s.nodeMgr, s.dist, s.meta, s.targetMgr)
	if balancer, ok := s.balancerMap[params.Params.QueryCoordCfg.Balancer.GetValue()]; ok {
		s.balancer = balancer
		log.Info("use config balancer", zap.String("balancer", params.Params.QueryCoordCfg.Balancer.GetValue()))
	} else {
		s.balancer = s.balancerMap[balance.RowCountBasedBalancerName]
		log.Info("use rowCountBased auto balancer")
	}
B
Bingyi Sun 已提交
272 273

	// Init checker controller
X
Xiaofan 已提交
274
	log.Info("init checker controller")
B
Bingyi Sun 已提交
275 276 277 278 279
	s.checkerController = checkers.NewCheckerController(
		s.meta,
		s.dist,
		s.targetMgr,
		s.balancer,
280
		s.nodeMgr,
B
Bingyi Sun 已提交
281 282 283 284 285 286
		s.taskScheduler,
	)

	// Init observers
	s.initObserver()

287 288 289
	// Init load status cache
	meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()

B
Bingyi Sun 已提交
290 291 292 293 294
	log.Info("QueryCoord init success")
	return err
}

func (s *Server) initMeta() error {
295 296
	record := timerecord.NewTimeRecorder("querycoord")

X
Xiaofan 已提交
297
	log.Info("init meta")
B
Bingyi Sun 已提交
298
	s.store = meta.NewMetaStore(s.kv)
W
wei liu 已提交
299
	s.meta = meta.NewMeta(s.idAllocator, s.store, s.nodeMgr)
B
Bingyi Sun 已提交
300

301 302 303 304 305
	s.broker = meta.NewCoordinatorBroker(
		s.dataCoord,
		s.rootCoord,
	)

X
Xiaofan 已提交
306
	log.Info("recover meta...")
307
	err := s.meta.CollectionManager.Recover(s.broker)
B
Bingyi Sun 已提交
308 309 310 311
	if err != nil {
		log.Error("failed to recover collections")
		return err
	}
312 313
	collections := s.meta.GetAll()
	log.Info("recovering collections...", zap.Int64s("collections", collections))
314 315 316 317

	// We really update the metric after observers think the collection loaded.
	metrics.QueryCoordNumCollections.WithLabelValues().Set(0)

318
	metrics.QueryCoordNumPartitions.WithLabelValues().Set(float64(len(s.meta.GetAllPartitions())))
Y
yah01 已提交
319

320
	err = s.meta.ReplicaManager.Recover(collections)
B
Bingyi Sun 已提交
321 322 323 324 325
	if err != nil {
		log.Error("failed to recover replicas")
		return err
	}

W
wei liu 已提交
326 327 328 329 330 331
	err = s.meta.ResourceManager.Recover()
	if err != nil {
		log.Error("failed to recover resource groups")
		return err
	}

B
Bingyi Sun 已提交
332 333 334 335 336
	s.dist = &meta.DistributionManager{
		SegmentDistManager: meta.NewSegmentDistManager(),
		ChannelDistManager: meta.NewChannelDistManager(),
		LeaderViewManager:  meta.NewLeaderViewManager(),
	}
W
wei liu 已提交
337
	s.targetMgr = meta.NewTargetManager(s.broker, s.meta)
338
	log.Info("QueryCoord server initMeta done", zap.Duration("duration", record.ElapseSpan()))
B
Bingyi Sun 已提交
339 340 341 342
	return nil
}

func (s *Server) initObserver() {
X
Xiaofan 已提交
343
	log.Info("init observers")
B
Bingyi Sun 已提交
344 345 346 347
	s.leaderObserver = observers.NewLeaderObserver(
		s.dist,
		s.meta,
		s.targetMgr,
Y
yah01 已提交
348
		s.broker,
B
Bingyi Sun 已提交
349 350
		s.cluster,
	)
W
wei liu 已提交
351
	s.targetObserver = observers.NewTargetObserver(
B
Bingyi Sun 已提交
352 353
		s.meta,
		s.targetMgr,
W
wei liu 已提交
354
		s.dist,
355
		s.broker,
B
Bingyi Sun 已提交
356
	)
357 358 359 360 361
	s.collectionObserver = observers.NewCollectionObserver(
		s.dist,
		s.meta,
		s.targetMgr,
		s.targetObserver,
362
		s.checkerController,
363
	)
W
wei liu 已提交
364 365 366 367 368 369 370

	s.replicaObserver = observers.NewReplicaObserver(
		s.meta,
		s.dist,
	)

	s.resourceObserver = observers.NewResourceObserver(s.meta)
B
Bingyi Sun 已提交
371 372
}

J
Jiquan Long 已提交
373 374 375
func (s *Server) afterStart() {
}

B
Bingyi Sun 已提交
376
func (s *Server) Start() error {
377 378 379 380 381 382 383 384 385 386
	if !s.enableActiveStandBy {
		if err := s.startQueryCoord(); err != nil {
			return err
		}
		log.Info("QueryCoord started")
	}
	return nil
}

func (s *Server) startQueryCoord() error {
B
Bingyi Sun 已提交
387 388 389 390 391 392 393
	log.Info("start watcher...")
	sessions, revision, err := s.session.GetSessions(typeutil.QueryNodeRole)
	if err != nil {
		return err
	}
	for _, node := range sessions {
		s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))
394
		s.taskScheduler.AddExecutor(node.ServerID)
B
Bingyi Sun 已提交
395 396 397 398 399
	}
	s.checkReplicas()
	for _, node := range sessions {
		s.handleNodeUp(node.ServerID)
	}
W
wei liu 已提交
400 401 402

	s.wg.Add(2)
	go s.handleNodeUpLoop()
B
Bingyi Sun 已提交
403 404 405 406 407 408 409
	go s.watchNodes(revision)

	log.Info("start recovering dist and target")
	err = s.recover()
	if err != nil {
		return err
	}
410
	s.startServerLoop()
411
	s.afterStart()
412
	s.UpdateStateCode(commonpb.StateCode_Healthy)
413 414 415 416
	return nil
}

func (s *Server) startServerLoop() {
B
Bingyi Sun 已提交
417 418 419 420 421 422
	log.Info("start cluster...")
	s.cluster.Start(s.ctx)

	log.Info("start job scheduler...")
	s.jobScheduler.Start(s.ctx)

423 424 425
	log.Info("start task scheduler...")
	s.taskScheduler.Start(s.ctx)

B
Bingyi Sun 已提交
426 427 428 429 430 431
	log.Info("start checker controller...")
	s.checkerController.Start(s.ctx)

	log.Info("start observers...")
	s.collectionObserver.Start(s.ctx)
	s.leaderObserver.Start(s.ctx)
W
wei liu 已提交
432
	s.targetObserver.Start(s.ctx)
W
wei liu 已提交
433 434
	s.replicaObserver.Start(s.ctx)
	s.resourceObserver.Start(s.ctx)
B
Bingyi Sun 已提交
435 436 437 438
}

func (s *Server) Stop() error {
	s.cancel()
B
Bingyi Sun 已提交
439
	if s.session != nil {
440
		s.session.Stop()
B
Bingyi Sun 已提交
441
	}
B
Bingyi Sun 已提交
442

B
Bingyi Sun 已提交
443 444 445 446
	if s.session != nil {
		log.Info("stop cluster...")
		s.cluster.Stop()
	}
B
Bingyi Sun 已提交
447

B
Bingyi Sun 已提交
448 449 450 451
	if s.distController != nil {
		log.Info("stop dist controller...")
		s.distController.Stop()
	}
B
Bingyi Sun 已提交
452

B
Bingyi Sun 已提交
453 454 455 456
	if s.checkerController != nil {
		log.Info("stop checker controller...")
		s.checkerController.Stop()
	}
B
Bingyi Sun 已提交
457

B
Bingyi Sun 已提交
458 459 460 461
	if s.taskScheduler != nil {
		log.Info("stop task scheduler...")
		s.taskScheduler.Stop()
	}
462

B
Bingyi Sun 已提交
463 464 465 466
	if s.jobScheduler != nil {
		log.Info("stop job scheduler...")
		s.jobScheduler.Stop()
	}
B
Bingyi Sun 已提交
467 468

	log.Info("stop observers...")
B
Bingyi Sun 已提交
469 470 471 472 473 474
	if s.collectionObserver != nil {
		s.collectionObserver.Stop()
	}
	if s.leaderObserver != nil {
		s.leaderObserver.Stop()
	}
W
wei liu 已提交
475 476
	if s.targetObserver != nil {
		s.targetObserver.Stop()
B
Bingyi Sun 已提交
477
	}
W
wei liu 已提交
478 479 480 481 482 483
	if s.replicaObserver != nil {
		s.replicaObserver.Stop()
	}
	if s.resourceObserver != nil {
		s.resourceObserver.Stop()
	}
B
Bingyi Sun 已提交
484 485

	s.wg.Wait()
B
Bingyi Sun 已提交
486
	log.Info("QueryCoord stop successfully")
B
Bingyi Sun 已提交
487 488 489 490
	return nil
}

// UpdateStateCode updates the status of the coord, including healthy, unhealthy
491
func (s *Server) UpdateStateCode(code commonpb.StateCode) {
492 493 494 495 496
	s.status.Store(int32(code))
}

func (s *Server) State() commonpb.StateCode {
	return commonpb.StateCode(s.status.Load())
B
Bingyi Sun 已提交
497 498
}

499
func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
B
Bingyi Sun 已提交
500 501 502 503
	nodeID := common.NotRegisteredID
	if s.session != nil && s.session.Registered() {
		nodeID = s.session.ServerID
	}
504
	serviceComponentInfo := &milvuspb.ComponentInfo{
B
Bingyi Sun 已提交
505 506
		// NodeID:    Params.QueryCoordID, // will race with QueryCoord.Register()
		NodeID:    nodeID,
507
		StateCode: s.State(),
B
Bingyi Sun 已提交
508 509
	}

510
	return &milvuspb.ComponentStates{
B
Bingyi Sun 已提交
511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
		},
		State: serviceComponentInfo,
		//SubcomponentStates: subComponentInfos,
	}, nil
}

func (s *Server) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
	}, nil
}

func (s *Server) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_Success,
			Reason:    "",
		},
534
		Value: Params.CommonCfg.QueryCoordTimeTick.GetValue(),
B
Bingyi Sun 已提交
535 536 537
	}, nil
}

E
Enwei Jiao 已提交
538 539 540 541
func (s *Server) SetAddress(address string) {
	s.address = address
}

B
Bingyi Sun 已提交
542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566
// SetEtcdClient sets etcd's client
func (s *Server) SetEtcdClient(etcdClient *clientv3.Client) {
	s.etcdCli = etcdClient
}

// SetRootCoord sets root coordinator's client
func (s *Server) SetRootCoord(rootCoord types.RootCoord) error {
	if rootCoord == nil {
		return errors.New("null RootCoord interface")
	}

	s.rootCoord = rootCoord
	return nil
}

// SetDataCoord sets data coordinator's client
func (s *Server) SetDataCoord(dataCoord types.DataCoord) error {
	if dataCoord == nil {
		return errors.New("null DataCoord interface")
	}

	s.dataCoord = dataCoord
	return nil
}

W
wayblink 已提交
567 568 569 570
func (s *Server) SetQueryNodeCreator(f func(ctx context.Context, addr string) (types.QueryNode, error)) {
	s.queryNodeCreator = f
}

B
Bingyi Sun 已提交
571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591
func (s *Server) recover() error {
	// Recover target managers
	group, ctx := errgroup.WithContext(s.ctx)
	for _, collection := range s.meta.GetAll() {
		collection := collection
		group.Go(func() error {
			return s.recoverCollectionTargets(ctx, collection)
		})
	}
	err := group.Wait()
	if err != nil {
		return err
	}

	// Recover dist
	s.distController.SyncAll(s.ctx)

	return nil
}

func (s *Server) recoverCollectionTargets(ctx context.Context, collection int64) error {
W
wei liu 已提交
592
	err := s.targetMgr.UpdateCollectionNextTarget(collection)
B
Bingyi Sun 已提交
593
	if err != nil {
594 595 596 597 598 599
		s.meta.CollectionManager.RemoveCollection(collection)
		s.meta.ReplicaManager.RemoveCollection(collection)
		log.Error("failed to recover collection due to update next target failed",
			zap.Int64("collectionID", collection),
			zap.Error(err),
		)
B
Bingyi Sun 已提交
600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616
	}
	return nil
}

func (s *Server) watchNodes(revision int64) {
	defer s.wg.Done()

	eventChan := s.session.WatchServices(typeutil.QueryNodeRole, revision+1, nil)
	for {
		select {
		case <-s.ctx.Done():
			log.Info("stop watching nodes, QueryCoord stopped")
			return

		case event, ok := <-eventChan:
			if !ok {
				// ErrCompacted is handled inside SessionWatcher
617
				log.Error("Session Watcher channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
B
Bingyi Sun 已提交
618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635
				go s.Stop()
				if s.session.TriggerKill {
					if p, err := os.FindProcess(os.Getpid()); err == nil {
						p.Signal(syscall.SIGINT)
					}
				}
				return
			}

			switch event.EventType {
			case sessionutil.SessionAddEvent:
				nodeID := event.Session.ServerID
				addr := event.Session.Address
				log.Info("add node to NodeManager",
					zap.Int64("nodeID", nodeID),
					zap.String("nodeAddr", addr),
				)
				s.nodeMgr.Add(session.NewNodeInfo(nodeID, addr))
W
wei liu 已提交
636
				s.nodeUpEventChan <- nodeID
X
Xiaofan 已提交
637 638 639 640
				select {
				case s.notifyNodeUp <- struct{}{}:
				default:
				}
B
Bingyi Sun 已提交
641

642 643 644 645 646 647 648 649 650 651
			case sessionutil.SessionUpdateEvent:
				nodeID := event.Session.ServerID
				addr := event.Session.Address
				log.Info("stopping the node",
					zap.Int64("nodeID", nodeID),
					zap.String("nodeAddr", addr),
				)
				s.nodeMgr.Stopping(nodeID)
				s.checkerController.Check()

B
Bingyi Sun 已提交
652 653 654 655 656 657 658 659 660 661 662
			case sessionutil.SessionDelEvent:
				nodeID := event.Session.ServerID
				log.Info("a node down, remove it", zap.Int64("nodeID", nodeID))
				s.nodeMgr.Remove(nodeID)
				s.handleNodeDown(nodeID)
				s.metricsCacheManager.InvalidateSystemInfoMetrics()
			}
		}
	}
}

W
wei liu 已提交
663 664 665 666 667 668 669
func (s *Server) handleNodeUpLoop() {
	defer s.wg.Done()
	ticker := time.NewTicker(Params.QueryCoordCfg.CheckHealthInterval.GetAsDuration(time.Millisecond))
	defer ticker.Stop()
	for {
		select {
		case <-s.ctx.Done():
W
wei liu 已提交
670
			log.Info("handle node up loop exit due to context done")
W
wei liu 已提交
671
			return
W
wei liu 已提交
672 673
		case <-s.notifyNodeUp:
			s.tryHandleNodeUp()
W
wei liu 已提交
674
		case <-ticker.C:
W
wei liu 已提交
675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701
			s.tryHandleNodeUp()
		}
	}
}

func (s *Server) tryHandleNodeUp() {
	log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60)
	ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond))
	defer cancel()
	reasons, err := s.checkNodeHealth(ctx)
	if err != nil {
		log.RatedWarn(10, "unhealthy node exist, node up will be delayed",
			zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)),
			zap.Int("unhealthyNodeNum", len(reasons)),
			zap.Strings("unhealthyReason", reasons))
		return
	}
	for len(s.nodeUpEventChan) > 0 {
		nodeID := <-s.nodeUpEventChan
		if s.nodeMgr.Get(nodeID) != nil {
			// only if all nodes are healthy, node up event will be handled
			s.handleNodeUp(nodeID)
			s.metricsCacheManager.InvalidateSystemInfoMetrics()
			s.checkerController.Check()
		} else {
			log.Warn("node already down",
				zap.Int64("nodeID", nodeID))
W
wei liu 已提交
702 703 704 705
		}
	}
}

B
Bingyi Sun 已提交
706 707
func (s *Server) handleNodeUp(node int64) {
	log := log.With(zap.Int64("nodeID", node))
708
	s.taskScheduler.AddExecutor(node)
B
Bingyi Sun 已提交
709 710
	s.distController.StartDistInstance(s.ctx, node)

W
wei liu 已提交
711 712 713 714 715 716 717 718 719 720 721 722 723
	// need assign to new rg and replica
	rgName, err := s.meta.ResourceManager.HandleNodeUp(node)
	if err != nil {
		log.Warn("HandleNodeUp: failed to assign node to resource group",
			zap.Error(err),
		)
		return
	}

	log.Info("HandleNodeUp: assign node to resource group",
		zap.String("resourceGroup", rgName),
	)

W
wei liu 已提交
724
	utils.AddNodesToCollectionsInRG(s.meta, meta.DefaultResourceGroupName, node)
B
Bingyi Sun 已提交
725 726 727 728
}

func (s *Server) handleNodeDown(node int64) {
	log := log.With(zap.Int64("nodeID", node))
729
	s.taskScheduler.RemoveExecutor(node)
B
Bingyi Sun 已提交
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756
	s.distController.Remove(node)

	// Clear dist
	s.dist.LeaderViewManager.Update(node)
	s.dist.ChannelDistManager.Update(node)
	s.dist.SegmentDistManager.Update(node)

	// Clear meta
	for _, collection := range s.meta.CollectionManager.GetAll() {
		log := log.With(zap.Int64("collectionID", collection))
		replica := s.meta.ReplicaManager.GetByCollectionAndNode(collection, node)
		if replica == nil {
			continue
		}
		err := s.meta.ReplicaManager.RemoveNode(replica.GetID(), node)
		if err != nil {
			log.Warn("failed to remove node from collection's replicas",
				zap.Int64("replicaID", replica.GetID()),
				zap.Error(err),
			)
		}
		log.Info("remove node from replica",
			zap.Int64("replicaID", replica.GetID()))
	}

	// Clear tasks
	s.taskScheduler.RemoveByNode(node)
W
wei liu 已提交
757 758 759 760 761 762 763 764 765 766 767 768 769

	rgName, err := s.meta.ResourceManager.HandleNodeDown(node)
	if err != nil {
		log.Warn("HandleNodeDown: failed to remove node from resource group",
			zap.String("resourceGroup", rgName),
			zap.Error(err),
		)
		return
	}

	log.Info("HandleNodeDown: remove node from resource group",
		zap.String("resourceGroup", rgName),
	)
B
Bingyi Sun 已提交
770 771 772 773 774 775 776 777 778 779
}

// checkReplicas checks whether replica contains offline node, and remove those nodes
func (s *Server) checkReplicas() {
	for _, collection := range s.meta.CollectionManager.GetAll() {
		log := log.With(zap.Int64("collectionID", collection))
		replicas := s.meta.ReplicaManager.GetByCollection(collection)
		for _, replica := range replicas {
			replica := replica.Clone()
			toRemove := make([]int64, 0)
W
wei liu 已提交
780
			for _, node := range replica.GetNodes() {
B
Bingyi Sun 已提交
781 782 783 784 785 786
				if s.nodeMgr.Get(node) == nil {
					toRemove = append(toRemove, node)
				}
			}

			if len(toRemove) > 0 {
Y
yah01 已提交
787 788 789 790
				log := log.With(
					zap.Int64("replicaID", replica.GetID()),
					zap.Int64s("offlineNodes", toRemove),
				)
X
Xiaofan 已提交
791
				log.Info("some nodes are offline, remove them from replica", zap.Any("toRemove", toRemove))
B
Bingyi Sun 已提交
792 793 794 795 796 797 798 799 800
				replica.RemoveNode(toRemove...)
				err := s.meta.ReplicaManager.Put(replica)
				if err != nil {
					log.Warn("failed to remove offline nodes from replica")
				}
			}
		}
	}
}