server.go 24.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
31 32
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/logutil"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/msgstream"
X
Xiangyu Wang 已提交
35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
41
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
44
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiangyu Wang 已提交
46 47 48
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
	"go.uber.org/zap"
S
sunby 已提交
49 50
)

51 52 53 54
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
55

56 57
var (
	// TODO: sunby put to config
58 59 60 61 62
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
63 64
)

S
sunby 已提交
65
type (
66 67 68
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
69
	Timestamp = typeutil.Timestamp
S
sunby 已提交
70
)
S
sunby 已提交
71

72
// ServerState type alias, presents datacoord Server State
73 74 75 76 77 78 79 80 81 82 83
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

84 85
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
86

87 88 89
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

90 91
var Params paramtable.GlobalParamTable

92 93
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
94
type Server struct {
S
sunby 已提交
95 96 97 98
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
99
	quitCh           chan struct{}
100
	isServing        ServerState
S
sunby 已提交
101
	helper           ServerHelper
102

103 104 105 106 107 108 109 110 111 112
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
113
	handler          Handler
114

S
sunby 已提交
115 116 117
	compactionTrigger trigger
	compactionHandler compactionPlanContext

118 119
	metricsCacheManager *metricsinfo.MetricsCacheManager

120 121
	flushCh   chan UniqueID
	msFactory msgstream.Factory
122

123 124
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
125

126 127
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
128
}
S
sunby 已提交
129

130
// ServerHelper datacoord server injection helper
S
sunby 已提交
131 132 133 134 135 136 137 138 139 140
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

141
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
142 143
type Option func(svr *Server)

144
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
145
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
146 147 148 149 150
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

151
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
152 153 154 155 156 157
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

158 159 160 161 162 163 164
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

165
// SetDataNodeCreator returns an `Option` setting DataNode create function
166
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
167 168 169 170 171
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
172 173 174 175 176 177 178
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

179
// CreateServer creates a `Server` instance
S
sunby 已提交
180
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
181
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
182
	s := &Server{
S
sunby 已提交
183
		ctx:                    ctx,
184
		quitCh:                 make(chan struct{}),
S
sunby 已提交
185 186
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
187
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
188
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
189
		helper:                 defaultServerHelper(),
190 191

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
192
	}
S
sunby 已提交
193 194 195 196

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
197 198 199
	return s, nil
}

G
godchen 已提交
200 201
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
202 203
}

G
godchen 已提交
204 205
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
206 207
}

208 209 210 211 212
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

213
// Register registers data service at etcd
214
func (s *Server) Register() error {
215 216
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
217
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
218
		if err := s.Stop(); err != nil {
219
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
220 221 222 223 224 225 226 227
		}
		// manually send signal to starter goroutine
		syscall.Kill(syscall.Getpid(), syscall.SIGINT)
	})
	return nil
}

func (s *Server) initSession() error {
228
	s.session = sessionutil.NewSession(s.ctx, Params.DataCoordCfg.MetaRootPath, Params.DataCoordCfg.EtcdEndpoints)
229 230 231
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
232 233 234
	s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true)
	Params.DataCoordCfg.NodeID = s.session.ServerID
	Params.BaseParams.SetLogger(Params.DataCoordCfg.NodeID)
235 236 237
	return nil
}

238
// Init change server state to Initializing
239
func (s *Server) Init() error {
240
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
241
	return s.initSession()
S
sunby 已提交
242 243
}

244 245 246 247 248 249 250
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
251
func (s *Server) Start() error {
252
	var err error
S
sunby 已提交
253
	m := map[string]interface{}{
254
		"PulsarAddress":  Params.DataCoordCfg.PulsarAddress,
S
sunby 已提交
255 256 257 258 259 260
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
261
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
262 263
		return err
	}
264

S
sunby 已提交
265 266 267
	if err = s.initMeta(); err != nil {
		return err
	}
268

269 270
	s.handler = newServerHandler(s)

S
sunby 已提交
271 272 273
	if err = s.initCluster(); err != nil {
		return err
	}
274

C
congqixia 已提交
275
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
276
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
277 278 279
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
280

281
	s.startSegmentManager()
S
sunby 已提交
282 283 284
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
285

286 287 288 289
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
290
	s.startServerLoop()
291 292
	Params.DataCoordCfg.CreatedTime = time.Now()
	Params.DataCoordCfg.UpdatedTime = time.Now()
293
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
294
	logutil.Logger(s.ctx).Debug("startup success")
295

S
sunby 已提交
296
	return nil
297 298 299
}

func (s *Server) initCluster() error {
300 301 302 303
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
304
	var err error
305
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
306 307
	if err != nil {
		return err
308
	}
S
sunby 已提交
309 310
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
311
	return nil
312
}
G
groot 已提交
313

S
sunby 已提交
314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

332 333 334
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
335 336 337 338
	if Params.DataCoordCfg.EnableGarbageCollection {
		cli, err = minio.New(Params.DataCoordCfg.MinioAddress, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.DataCoordCfg.MinioAccessKeyID, Params.DataCoordCfg.MinioSecretAccessKey, ""),
			Secure: Params.DataCoordCfg.MinioUseSSL,
339 340 341 342
		})
		if err != nil {
			return err
		}
343 344 345

		checkBucketFn := func() error {
			has, err := cli.BucketExists(context.TODO(), Params.DataCoordCfg.MinioBucketName)
346 347 348
			if err != nil {
				return err
			}
349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
			if !has {
				err = cli.MakeBucket(context.TODO(), Params.DataCoordCfg.MinioBucketName, minio.MakeBucketOptions{})
				if err != nil {
					return err
				}
			}
			return nil
		}
		// retry times shall be two, just to prevent
		// 1. bucket not exists
		// 2. bucket is created by other componnent
		// 3. datacoord try to create but failed with bucket already exists error
		err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
		if err != nil {
			return err
364 365 366 367 368
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
369 370 371
		enabled:    Params.DataCoordCfg.EnableGarbageCollection,
		bucketName: Params.DataCoordCfg.MinioBucketName,
		rootPath:   Params.DataCoordCfg.MinioRootPath,
372

373 374 375
		checkInterval:    Params.DataCoordCfg.GCInterval,
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance,
376 377 378 379
	})
	return nil
}

380 381 382
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
B
Bingyi Sun 已提交
383
		log.Debug("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
384 385
		return err
	}
B
Bingyi Sun 已提交
386
	log.Debug("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
387

S
sunby 已提交
388
	datanodes := make([]*NodeInfo, 0, len(sessions))
389
	for _, session := range sessions {
390 391 392
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
393
		}
394
		datanodes = append(datanodes, info)
395
	}
G
godchen 已提交
396

S
sunby 已提交
397
	s.cluster.Startup(datanodes)
398

399
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
400 401 402
	return nil
}

403
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
404 405 406
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
407 408
}

S
sunby 已提交
409
func (s *Server) initMeta() error {
410
	connectEtcdFn := func() error {
411
		etcdKV, err := etcdkv.NewEtcdKV(Params.DataCoordCfg.EtcdEndpoints, Params.DataCoordCfg.MetaRootPath)
412 413 414
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
415 416

		s.kvClient = etcdKV
G
godchen 已提交
417
		s.meta, err = newMeta(s.kvClient)
418 419 420 421
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
422
	}
G
godchen 已提交
423
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
424 425
}

426 427
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
428
	s.serverLoopWg.Add(3)
429 430 431
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
432
	s.garbageCollector.start()
433 434
}

435 436
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
437 438 439
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
B
Bingyi Sun 已提交
440
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
S
sunby 已提交
441 442
		return
	}
443 444
	ttMsgStream.AsConsumerWithPosition([]string{Params.DataCoordCfg.TimeTickChannelName},
		Params.DataCoordCfg.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
B
Bingyi Sun 已提交
445
	log.Debug("DataCoord creates the timetick channel consumer",
446 447
		zap.String("timeTickChannel", Params.DataCoordCfg.TimeTickChannelName),
		zap.String("subscription", Params.DataCoordCfg.DataCoordSubscriptionName))
S
sunby 已提交
448
	ttMsgStream.Start()
449

450 451 452 453 454 455
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
456
		}
457

458 459 460 461 462 463
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
464
				log.Debug("DataNode timetick loop shutdown")
465 466
				return
			default:
467
			}
468 469
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
B
Bingyi Sun 已提交
470
				log.Debug("receive nil timetick msg and shutdown timetick channel")
471
				return
472
			}
473
			for _, msg := range msgPack.Msgs {
474 475 476
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
477 478 479 480 481
					continue
				}
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
482

483 484
				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
S
sunby 已提交
485
					continue
486
				}
S
sunby 已提交
487
			}
488
			s.helper.eventAfterHandleDataNodeTt()
489
		}
490
	}()
491 492
}

493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

	log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

587
// start a goroutine wto watch services
588
func (s *Server) startWatchService(ctx context.Context) {
589 590 591 592 593
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
594
	defer logutil.LogPanic()
595 596 597 598 599 600
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
601 602 603 604 605
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
606 607 608 609 610 611 612 613
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
614 615
		}
	}
616

617
}
S
sunby 已提交
618

619
// handles session events - DataNodes Add/Del
620
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
621
	if event == nil {
622
		return nil
623 624 625 626 627 628
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
629 630 631 632
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
633 634 635 636 637
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
638 639 640 641
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
642 643 644 645 646
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
647 648 649 650
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
651 652 653 654 655
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
656
	return nil
657 658
}

659 660
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
661
func (s *Server) startFlushLoop(ctx context.Context) {
662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
678
		}
679
	}()
S
sunby 已提交
680 681
}

682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
714 715 716 717 718 719 720 721 722 723 724
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

725
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
726
	var err error
727
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.DataCoordCfg.MetaRootPath, Params.DataCoordCfg.EtcdEndpoints); err != nil {
S
sunby 已提交
728 729
		return err
	}
730
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
731 732
		return err
	}
733
	return s.rootCoordClient.Start()
S
sunby 已提交
734
}
735

736 737 738 739
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
740
func (s *Server) Stop() error {
741
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
742 743
		return nil
	}
744
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
745
	s.cluster.Close()
746
	s.garbageCollector.close()
S
sunby 已提交
747
	s.stopServerLoop()
C
congqixia 已提交
748
	s.session.Revoke(time.Second)
S
sunby 已提交
749

750
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
751 752 753
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
754 755 756
	return nil
}

S
sunby 已提交
757 758
// CleanMeta only for test
func (s *Server) CleanMeta() error {
759
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
760
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
761 762
}

S
sunby 已提交
763 764 765 766 767
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

768 769 770 771 772 773 774 775 776 777 778 779 780 781
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
782

783 784
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
785
		Base: &commonpb.MsgBase{
786
			MsgType:  commonpb.MsgType_DescribeCollection,
787
			SourceID: Params.DataCoordCfg.NodeID,
S
sunby 已提交
788 789 790 791 792 793 794
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
795
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
796 797
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
798 799
			MsgID:     0,
			Timestamp: 0,
800
			SourceID:  Params.DataCoordCfg.NodeID,
801
		},
S
sunby 已提交
802 803 804 805 806
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
807 808
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
809 810
		return err
	}
S
sunby 已提交
811
	collInfo := &datapb.CollectionInfo{
812 813 814 815
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
816
	}
S
sunby 已提交
817 818
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
819
}