server.go 23.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
31
	"github.com/milvus-io/milvus/internal/logutil"
32 33 34
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
35 36
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
37
	"go.uber.org/zap"
S
sunby 已提交
38

X
Xiangyu Wang 已提交
39 40 41 42 43
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
44
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
46

X
Xiangyu Wang 已提交
47 48 49
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
50 51
)

52 53 54 55
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
56

57 58
var (
	// TODO: sunby put to config
59 60 61 62 63
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
64 65
)

S
sunby 已提交
66
type (
67 68 69
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
70
	Timestamp = typeutil.Timestamp
S
sunby 已提交
71
)
S
sunby 已提交
72

73
// ServerState type alias, presents datacoord Server State
74 75 76 77 78 79 80 81 82 83 84
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

85 86
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
87

88 89 90
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

91 92
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
93
type Server struct {
S
sunby 已提交
94 95 96 97
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
98
	quitCh           chan struct{}
99
	isServing        ServerState
S
sunby 已提交
100
	helper           ServerHelper
101

102 103 104 105 106 107 108 109 110 111
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
112
	handler          Handler
113

S
sunby 已提交
114 115 116
	compactionTrigger trigger
	compactionHandler compactionPlanContext

117 118
	metricsCacheManager *metricsinfo.MetricsCacheManager

119 120
	flushCh   chan UniqueID
	msFactory msgstream.Factory
121

122 123
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
124

125 126
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
127
}
S
sunby 已提交
128

129
// ServerHelper datacoord server injection helper
S
sunby 已提交
130 131 132 133 134 135 136 137 138 139
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

140
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
141 142
type Option func(svr *Server)

143
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
144
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
145 146 147 148 149
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

150
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
151 152 153 154 155 156
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

157 158 159 160 161 162 163
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

164
// SetDataNodeCreator returns an `Option` setting DataNode create function
165
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
166 167 168 169 170
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
171 172 173 174 175 176 177
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

178
// CreateServer create `Server` instance
S
sunby 已提交
179
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
180
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
181
	s := &Server{
S
sunby 已提交
182
		ctx:                    ctx,
183
		quitCh:                 make(chan struct{}),
S
sunby 已提交
184 185
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
186
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
187
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
188
		helper:                 defaultServerHelper(),
189 190

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
191
	}
S
sunby 已提交
192 193 194 195

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
196 197 198
	return s, nil
}

G
godchen 已提交
199 200
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
201 202
}

G
godchen 已提交
203 204
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
205 206
}

207 208 209 210 211
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

212 213
// Register register data service at etcd
func (s *Server) Register() error {
214 215 216 217 218 219 220 221 222 223 224 225 226
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
		log.Error("DataCoord disconnected from etcd, process will exit", zap.Int64("ServerID", s.session.ServerID))
		if err := s.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
		// manually send signal to starter goroutine
		syscall.Kill(syscall.Getpid(), syscall.SIGINT)
	})
	return nil
}

func (s *Server) initSession() error {
227
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
228 229 230
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
231
	s.session.Init(typeutil.DataCoordRole, Params.Address, true)
232
	Params.NodeID = s.session.ServerID
233
	Params.SetLogger(Params.NodeID)
234 235 236
	return nil
}

237
// Init change server state to Initializing
238
func (s *Server) Init() error {
239
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
240
	return s.initSession()
S
sunby 已提交
241 242
}

243 244 245 246 247 248 249
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
250
func (s *Server) Start() error {
251
	var err error
S
sunby 已提交
252 253 254 255 256 257 258 259
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
260
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
261 262
		return err
	}
263

S
sunby 已提交
264 265 266
	if err = s.initMeta(); err != nil {
		return err
	}
267

268 269
	s.handler = newServerHandler(s)

S
sunby 已提交
270 271 272
	if err = s.initCluster(); err != nil {
		return err
	}
273

C
congqixia 已提交
274
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
S
sunby 已提交
275 276 277 278
	if Params.EnableCompaction {
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
279

280
	s.startSegmentManager()
S
sunby 已提交
281 282 283
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
284

285 286 287 288
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
289
	s.startServerLoop()
290 291
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
292
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
293
	log.Debug("dataCoordinator startup success")
294

S
sunby 已提交
295
	return nil
296 297 298
}

func (s *Server) initCluster() error {
299 300 301 302
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
303
	var err error
304
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
305 306
	if err != nil {
		return err
307
	}
S
sunby 已提交
308 309
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
310
	return nil
311
}
G
groot 已提交
312

S
sunby 已提交
313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
	if Params.EnableGarbageCollection {
		cli, err = minio.New(Params.MinioAddress, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""),
			Secure: Params.MinioUseSSL,
		})
		if err != nil {
			return err
		}
		has, err := cli.BucketExists(context.TODO(), Params.MinioBucketName)
		if err != nil {
			return err
		}
		if !has {
			err = cli.MakeBucket(context.TODO(), Params.MinioBucketName, minio.MakeBucketOptions{})
			if err != nil {
				return err
			}
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
		enabled:    Params.EnableGarbageCollection,
		bucketName: Params.MinioBucketName,
		rootPath:   Params.MinioRootPath,

360 361 362
		checkInterval:    Params.GCInterval,
		missingTolerance: Params.GCMissingTolerance,
		dropTolerance:    Params.GCDropTolerance,
363 364 365 366
	})
	return nil
}

367 368 369
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
370
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
371 372
		return err
	}
373 374
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
375
	datanodes := make([]*NodeInfo, 0, len(sessions))
376
	for _, session := range sessions {
377 378 379
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
380
		}
381
		datanodes = append(datanodes, info)
382
	}
G
godchen 已提交
383

S
sunby 已提交
384
	s.cluster.Startup(datanodes)
385

386
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
387 388 389
	return nil
}

390
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
391 392 393
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
394 395
}

S
sunby 已提交
396
func (s *Server) initMeta() error {
397
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
398
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
399 400 401
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
402 403

		s.kvClient = etcdKV
G
godchen 已提交
404
		s.meta, err = newMeta(s.kvClient)
405 406 407 408
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
409
	}
G
godchen 已提交
410
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
411 412
}

413 414
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
415
	s.serverLoopWg.Add(3)
416 417 418
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
419
	s.garbageCollector.start()
420 421
}

422 423
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
424 425 426 427 428 429
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
430 431
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
432 433 434
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
435
	ttMsgStream.Start()
436

437 438 439 440 441 442
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
443
		}
444

445 446 447 448 449 450 451 452 453
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("data node tt loop shutdown")
				return
			default:
454
			}
455 456 457 458
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil tt msg, shutdown tt channel")
				return
459
			}
460
			for _, msg := range msgPack.Msgs {
461 462 463
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
464 465 466 467 468
					continue
				}
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
469

470 471
				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
S
sunby 已提交
472
					continue
473
				}
S
sunby 已提交
474
			}
475
			s.helper.eventAfterHandleDataNodeTt()
476
		}
477
	}()
478 479
}

480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

	log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

574
// start a goroutine wto watch services
575
func (s *Server) startWatchService(ctx context.Context) {
576 577 578 579 580
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
581
	defer logutil.LogPanic()
582 583 584 585 586 587
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
588 589 590 591 592
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
593 594 595 596 597 598 599 600
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
601 602
		}
	}
603

604
}
S
sunby 已提交
605

606
// handles session events - DataNodes Add/Del
607
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
608
	if event == nil {
609
		return nil
610 611 612 613 614 615
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
616 617 618 619
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
620 621 622 623 624
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
625 626 627 628
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
629 630 631 632 633
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
634 635 636 637
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
638 639 640 641 642
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
643
	return nil
644 645
}

646 647
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
648
func (s *Server) startFlushLoop(ctx context.Context) {
649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
665
		}
666
	}()
S
sunby 已提交
667 668
}

669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
701 702 703 704 705 706 707 708 709 710 711
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

712
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
713
	var err error
G
godchen 已提交
714
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
715 716
		return err
	}
717
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
718 719
		return err
	}
720
	return s.rootCoordClient.Start()
S
sunby 已提交
721
}
722

723 724 725 726
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
727
func (s *Server) Stop() error {
728
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
729 730
		return nil
	}
731
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
732
	s.cluster.Close()
733
	s.garbageCollector.close()
S
sunby 已提交
734
	s.stopServerLoop()
C
congqixia 已提交
735
	s.session.Revoke(time.Second)
S
sunby 已提交
736 737 738 739 740

	if Params.EnableCompaction {
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
741 742 743
	return nil
}

S
sunby 已提交
744 745
// CleanMeta only for test
func (s *Server) CleanMeta() error {
746
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
747
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
748 749
}

S
sunby 已提交
750 751 752 753 754
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

755 756 757 758 759 760 761 762 763 764 765 766 767 768
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
769

770 771
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
772
		Base: &commonpb.MsgBase{
773
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
774 775 776 777 778 779 780 781
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
782
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
783 784
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
785 786
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
787
			SourceID:  Params.NodeID,
788
		},
S
sunby 已提交
789 790 791 792 793
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
794 795
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
796 797
		return err
	}
S
sunby 已提交
798
	collInfo := &datapb.CollectionInfo{
799 800 801 802
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
803
	}
S
sunby 已提交
804 805
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
806
}