server.go 22.8 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25 26 27
	"sync/atomic"
	"time"

S
sunby 已提交
28
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
29
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
30
	"github.com/milvus-io/milvus/internal/logutil"
31 32 33
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
34 35
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
36
	"go.uber.org/zap"
S
sunby 已提交
37

X
Xiangyu Wang 已提交
38 39 40 41 42
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
44
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
45

X
Xiangyu Wang 已提交
46 47 48
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
49 50
)

51 52 53 54
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
55

56 57
var (
	// TODO: sunby put to config
58 59 60 61 62
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
63 64
)

S
sunby 已提交
65
type (
66 67 68
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
69
	Timestamp = typeutil.Timestamp
S
sunby 已提交
70
)
S
sunby 已提交
71

72
// ServerState type alias, presents datacoord Server State
73 74 75 76 77 78 79 80 81 82 83
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

84 85
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
86

87 88 89
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

90 91
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
92
type Server struct {
S
sunby 已提交
93 94 95 96
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
97
	isServing        ServerState
S
sunby 已提交
98
	helper           ServerHelper
99

100 101 102 103 104 105 106 107 108 109
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
110
	handler          Handler
111

S
sunby 已提交
112 113 114
	compactionTrigger trigger
	compactionHandler compactionPlanContext

115 116
	metricsCacheManager *metricsinfo.MetricsCacheManager

117 118
	flushCh   chan UniqueID
	msFactory msgstream.Factory
119

120 121
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
122

123 124
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
125
}
S
sunby 已提交
126

127
// ServerHelper datacoord server injection helper
S
sunby 已提交
128 129 130 131 132 133 134 135 136 137
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

138
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
139 140
type Option func(svr *Server)

141
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
142
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
143 144 145 146 147
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

148
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
149 150 151 152 153 154
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

155 156 157 158 159 160 161
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

162
// SetDataNodeCreator returns an `Option` setting DataNode create function
163
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
164 165 166 167 168
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
169 170 171 172 173 174 175
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

176
// CreateServer create `Server` instance
S
sunby 已提交
177
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
178
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
179
	s := &Server{
S
sunby 已提交
180 181 182
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
183
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
184
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
185
		helper:                 defaultServerHelper(),
186 187

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
188
	}
S
sunby 已提交
189 190 191 192

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
193 194 195
	return s, nil
}

G
godchen 已提交
196 197
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
198 199
}

G
godchen 已提交
200 201
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
202 203
}

204 205
// Register register data service at etcd
func (s *Server) Register() error {
206
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
207 208 209
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
210
	s.session.Init(typeutil.DataCoordRole, Params.Address, true)
211
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
212
	Params.SetLogger(typeutil.UniqueID(-1))
213 214 215
	return nil
}

216
// Init change server state to Initializing
217
func (s *Server) Init() error {
218
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
219 220 221
	return nil
}

222 223 224 225 226 227 228
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
229
func (s *Server) Start() error {
230
	var err error
S
sunby 已提交
231 232 233 234 235 236 237 238
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
239
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
240 241
		return err
	}
242

S
sunby 已提交
243 244 245
	if err = s.initMeta(); err != nil {
		return err
	}
246

247 248
	s.handler = newServerHandler(s)

S
sunby 已提交
249 250 251
	if err = s.initCluster(); err != nil {
		return err
	}
252

C
congqixia 已提交
253
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
S
sunby 已提交
254 255 256 257
	if Params.EnableCompaction {
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
258

259
	s.startSegmentManager()
S
sunby 已提交
260 261 262
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
263

264 265 266 267
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
268
	s.startServerLoop()
269 270
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
271
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
272
	log.Debug("dataCoordinator startup success")
273

S
sunby 已提交
274
	return nil
275 276 277
}

func (s *Server) initCluster() error {
278 279 280 281
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
282
	var err error
283
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
284 285
	if err != nil {
		return err
286
	}
S
sunby 已提交
287 288
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
289
	return nil
290
}
G
groot 已提交
291

S
sunby 已提交
292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
	if Params.EnableGarbageCollection {
		cli, err = minio.New(Params.MinioAddress, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""),
			Secure: Params.MinioUseSSL,
		})
		if err != nil {
			return err
		}
		has, err := cli.BucketExists(context.TODO(), Params.MinioBucketName)
		if err != nil {
			return err
		}
		if !has {
			err = cli.MakeBucket(context.TODO(), Params.MinioBucketName, minio.MakeBucketOptions{})
			if err != nil {
				return err
			}
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
		enabled:    Params.EnableGarbageCollection,
		bucketName: Params.MinioBucketName,
		rootPath:   Params.MinioRootPath,

339 340 341
		checkInterval:    Params.GCInterval,
		missingTolerance: Params.GCMissingTolerance,
		dropTolerance:    Params.GCDropTolerance,
342 343 344 345
	})
	return nil
}

346 347 348
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
349
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
350 351
		return err
	}
352 353
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
354
	datanodes := make([]*NodeInfo, 0, len(sessions))
355
	for _, session := range sessions {
356 357 358
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
359
		}
360
		datanodes = append(datanodes, info)
361
	}
G
godchen 已提交
362

S
sunby 已提交
363
	s.cluster.Startup(datanodes)
364

365
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
366 367 368
	return nil
}

369
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
370 371 372
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
373 374
}

S
sunby 已提交
375
func (s *Server) initMeta() error {
376
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
377
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
378 379 380
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
381 382

		s.kvClient = etcdKV
G
godchen 已提交
383
		s.meta, err = newMeta(s.kvClient)
384 385 386 387
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
388
	}
G
godchen 已提交
389
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
390 391
}

392 393
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
394
	s.serverLoopWg.Add(4)
395 396 397 398
	s.startStatsChannel(s.serverLoopCtx)
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
399
	s.garbageCollector.start()
400
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
X
Xiaofan 已提交
401 402 403 404
		log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
		if err := s.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
405
	})
406 407 408
}

func (s *Server) startStatsChannel(ctx context.Context) {
G
groot 已提交
409
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
410
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
411
	log.Debug("dataCoord create stats channel consumer",
412
		zap.String("channelName", Params.StatisticsChannelName),
413
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
414
	statsStream.Start()
415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer statsStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("stats channel shutdown")
				return
			default:
			}
			msgPack := statsStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil stats msg, shutdown stats channel")
				return
S
sunby 已提交
430
			}
431 432 433 434 435 436 437 438 439 440
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_SegmentStatistics {
					log.Warn("receive unknown msg from segment statistics channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
				for _, stat := range ssMsg.SegStats {
					s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
				}
441
			}
442
		}
443
	}()
444 445
}

446 447
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
448 449 450 451 452 453
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
454 455
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
456 457 458
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
459
	ttMsgStream.Start()
460

461 462 463 464 465 466
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
467
		}
468 469 470 471 472 473 474 475 476
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("data node tt loop shutdown")
				return
			default:
477
			}
478 479 480 481
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil tt msg, shutdown tt channel")
				return
482
			}
483 484 485 486 487 488 489 490 491 492
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_DataNodeTt {
					log.Warn("receive unexpected msg type from tt channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ttMsg := msg.(*msgstream.DataNodeTtMsg)
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
493

494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509
				ch := ttMsg.ChannelName
				ts := ttMsg.Timestamp
				if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
					log.Warn("failed to expire allocations", zap.Error(err))
					continue
				}
				physical, _ := tsoutil.ParseTS(ts)
				if time.Since(physical).Minutes() > 1 {
					// if lag behind, log every 1 mins about
					log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
				}
				segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
				if err != nil {
					log.Warn("get flushable segments failed", zap.Error(err))
					continue
				}
510

511
				staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
B
Bingyi Sun 已提交
512 513
					return isSegmentHealthy(info) &&
						info.GetInsertChannel() == ch &&
S
sunby 已提交
514 515
						!info.lastFlushTime.IsZero() &&
						time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
516
				})
517

518
				if len(segments)+len(staleSegments) == 0 {
S
sunby 已提交
519
					continue
520
				}
521 522 523 524 525 526 527
				log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
				segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
				for _, id := range segments {
					sInfo := s.meta.GetSegment(id)
					if sInfo == nil {
						log.Error("get segment from meta error", zap.Int64("id", id),
							zap.Error(err))
528 529
						continue
					}
530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545
					segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
					s.meta.SetLastFlushTime(id, time.Now())
				}
				markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
				for _, segment := range staleSegments {
					for _, fSeg := range segmentInfos {
						// check segment needs flush first
						if segment.GetID() == fSeg.GetID() {
							continue
						}
					}
					markSegments = append(markSegments, segment.SegmentInfo)
					s.meta.SetLastFlushTime(segment.GetID(), time.Now())
				}
				if len(segmentInfos)+len(markSegments) > 0 {
					s.cluster.Flush(s.ctx, segmentInfos, markSegments)
546
				}
S
sunby 已提交
547
			}
548
			s.helper.eventAfterHandleDataNodeTt()
549
		}
550
	}()
551 552
}

553
// start a goroutine wto watch services
554
func (s *Server) startWatchService(ctx context.Context) {
555 556 557 558 559
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
560
	defer logutil.LogPanic()
561 562 563 564 565 566
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
567 568 569 570 571
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
572 573 574 575 576 577 578 579
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
580 581
		}
	}
582

583
}
S
sunby 已提交
584

585
// handles session events - DataNodes Add/Del
586
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
587
	if event == nil {
588
		return nil
589 590 591 592 593 594
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
595 596 597 598
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
599 600 601 602 603
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
604 605 606 607
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
608 609 610 611 612
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
613 614 615 616
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
617 618 619 620 621
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
622
	return nil
623 624
}

S
sunby 已提交
625
func (s *Server) startFlushLoop(ctx context.Context) {
626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
642
		}
643
	}()
S
sunby 已提交
644 645
}

646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
678 679 680 681 682 683 684 685 686 687 688
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

689
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
690
	var err error
G
godchen 已提交
691
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
692 693
		return err
	}
694
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
695 696
		return err
	}
697
	return s.rootCoordClient.Start()
S
sunby 已提交
698
}
699

700 701 702 703
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
704
func (s *Server) Stop() error {
705
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
706 707
		return nil
	}
708
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
709
	s.cluster.Close()
710
	s.garbageCollector.close()
S
sunby 已提交
711
	s.stopServerLoop()
C
congqixia 已提交
712
	s.session.Revoke(time.Second)
S
sunby 已提交
713 714 715 716 717

	if Params.EnableCompaction {
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
718 719 720
	return nil
}

S
sunby 已提交
721 722
// CleanMeta only for test
func (s *Server) CleanMeta() error {
723
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
724
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
725 726
}

S
sunby 已提交
727 728 729 730 731
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

732 733 734 735 736 737 738 739 740 741 742 743 744 745
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
746

747 748
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
749
		Base: &commonpb.MsgBase{
750
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
751 752 753 754 755 756 757 758
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
759
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
760 761
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
762 763
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
764
			SourceID:  Params.NodeID,
765
		},
S
sunby 已提交
766 767 768 769 770
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
771 772
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
773 774
		return err
	}
S
sunby 已提交
775
	collInfo := &datapb.CollectionInfo{
776 777 778 779
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
780
	}
S
sunby 已提交
781 782
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
783
}