server.go 24.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
31 32
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/logutil"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/msgstream"
X
Xiangyu Wang 已提交
35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
41
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
44
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiangyu Wang 已提交
46 47
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
X
Xiaofan 已提交
48
	clientv3 "go.etcd.io/etcd/client/v3"
X
Xiangyu Wang 已提交
49
	"go.uber.org/zap"
S
sunby 已提交
50 51
)

52 53 54 55
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
56

57 58
var (
	// TODO: sunby put to config
59 60 61 62 63
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
64 65
)

S
sunby 已提交
66
type (
67 68 69
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
70
	Timestamp = typeutil.Timestamp
S
sunby 已提交
71
)
S
sunby 已提交
72

73
// ServerState type alias, presents datacoord Server State
74 75 76 77 78 79 80 81 82 83 84
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

85
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
X
Xiaofan 已提交
86
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoord, error)
S
sunby 已提交
87

88 89 90
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

91 92
var Params paramtable.GlobalParamTable

93 94
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
95
type Server struct {
S
sunby 已提交
96 97 98 99
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
100
	quitCh           chan struct{}
101
	isServing        ServerState
S
sunby 已提交
102
	helper           ServerHelper
103

X
Xiaofan 已提交
104
	etcdCli          *clientv3.Client
105 106 107 108 109 110 111 112 113 114
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
115
	handler          Handler
116

S
sunby 已提交
117 118 119
	compactionTrigger trigger
	compactionHandler compactionPlanContext

120 121
	metricsCacheManager *metricsinfo.MetricsCacheManager

122 123
	flushCh   chan UniqueID
	msFactory msgstream.Factory
124

125 126
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
127

128 129
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
130
}
S
sunby 已提交
131

132
// ServerHelper datacoord server injection helper
S
sunby 已提交
133 134 135 136 137 138 139 140 141 142
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

143
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
144 145
type Option func(svr *Server)

146
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
147
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
148 149 150 151 152
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

153
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
154 155 156 157 158 159
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

160 161 162 163 164 165 166
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

167
// SetDataNodeCreator returns an `Option` setting DataNode create function
168
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
169 170 171 172 173
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
174 175 176 177 178 179 180
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

181
// CreateServer creates a `Server` instance
X
Xiaofan 已提交
182
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) *Server {
S
sunby 已提交
183
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
184
	s := &Server{
S
sunby 已提交
185
		ctx:                    ctx,
186
		quitCh:                 make(chan struct{}),
S
sunby 已提交
187 188
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
189
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
190
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
191
		helper:                 defaultServerHelper(),
192 193

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
194
	}
S
sunby 已提交
195 196 197 198

	for _, opt := range opts {
		opt(s)
	}
X
Xiaofan 已提交
199
	return s
S
sunby 已提交
200 201
}

G
godchen 已提交
202 203
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
204 205
}

X
Xiaofan 已提交
206 207
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, client)
S
sunby 已提交
208 209
}

210 211 212 213 214
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

215
// Register registers data service at etcd
216
func (s *Server) Register() error {
217 218
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
219
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
220
		if err := s.Stop(); err != nil {
221
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
222 223
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
224 225 226
		if s.session.TriggerKill {
			syscall.Kill(syscall.Getpid(), syscall.SIGINT)
		}
227 228 229 230 231
	})
	return nil
}

func (s *Server) initSession() error {
X
Xiaofan 已提交
232
	s.session = sessionutil.NewSession(s.ctx, Params.DataCoordCfg.MetaRootPath, s.etcdCli)
233 234 235
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
X
Xiaofan 已提交
236
	s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true, true)
237 238
	Params.DataCoordCfg.NodeID = s.session.ServerID
	Params.BaseParams.SetLogger(Params.DataCoordCfg.NodeID)
239 240 241
	return nil
}

242
// Init change server state to Initializing
243
func (s *Server) Init() error {
244
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
245
	return s.initSession()
S
sunby 已提交
246 247
}

248 249 250 251 252 253 254
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
255
func (s *Server) Start() error {
256
	var err error
S
sunby 已提交
257
	m := map[string]interface{}{
258
		"PulsarAddress":  Params.DataCoordCfg.PulsarAddress,
S
sunby 已提交
259 260 261 262 263 264
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
265
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
266 267
		return err
	}
268

S
sunby 已提交
269 270 271
	if err = s.initMeta(); err != nil {
		return err
	}
272

273 274
	s.handler = newServerHandler(s)

S
sunby 已提交
275 276 277
	if err = s.initCluster(); err != nil {
		return err
	}
278

C
congqixia 已提交
279
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
280
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
281 282 283
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
284

285
	s.startSegmentManager()
S
sunby 已提交
286 287 288
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
289

290 291 292 293
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
294
	s.startServerLoop()
295 296
	Params.DataCoordCfg.CreatedTime = time.Now()
	Params.DataCoordCfg.UpdatedTime = time.Now()
297
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
298
	logutil.Logger(s.ctx).Debug("startup success")
299

S
sunby 已提交
300
	return nil
301 302 303
}

func (s *Server) initCluster() error {
304 305 306 307
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
308
	var err error
309
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler)
310 311
	if err != nil {
		return err
312
	}
S
sunby 已提交
313 314
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
315
	return nil
316
}
G
groot 已提交
317

X
Xiaofan 已提交
318 319 320 321 322
// SetEtcdClient sets etcd client for datacoord.
func (s *Server) SetEtcdClient(client *clientv3.Client) {
	s.etcdCli = client
}

S
sunby 已提交
323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

341 342 343
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
344 345 346 347
	if Params.DataCoordCfg.EnableGarbageCollection {
		cli, err = minio.New(Params.DataCoordCfg.MinioAddress, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.DataCoordCfg.MinioAccessKeyID, Params.DataCoordCfg.MinioSecretAccessKey, ""),
			Secure: Params.DataCoordCfg.MinioUseSSL,
348 349 350 351
		})
		if err != nil {
			return err
		}
352 353 354

		checkBucketFn := func() error {
			has, err := cli.BucketExists(context.TODO(), Params.DataCoordCfg.MinioBucketName)
355 356 357
			if err != nil {
				return err
			}
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372
			if !has {
				err = cli.MakeBucket(context.TODO(), Params.DataCoordCfg.MinioBucketName, minio.MakeBucketOptions{})
				if err != nil {
					return err
				}
			}
			return nil
		}
		// retry times shall be two, just to prevent
		// 1. bucket not exists
		// 2. bucket is created by other componnent
		// 3. datacoord try to create but failed with bucket already exists error
		err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
		if err != nil {
			return err
373 374 375 376 377
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
378 379 380
		enabled:    Params.DataCoordCfg.EnableGarbageCollection,
		bucketName: Params.DataCoordCfg.MinioBucketName,
		rootPath:   Params.DataCoordCfg.MinioRootPath,
381

382 383 384
		checkInterval:    Params.DataCoordCfg.GCInterval,
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance,
385 386 387 388
	})
	return nil
}

389 390 391
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
B
Bingyi Sun 已提交
392
		log.Debug("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
393 394
		return err
	}
B
Bingyi Sun 已提交
395
	log.Debug("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
396

S
sunby 已提交
397
	datanodes := make([]*NodeInfo, 0, len(sessions))
398
	for _, session := range sessions {
399 400 401
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
402
		}
403
		datanodes = append(datanodes, info)
404
	}
G
godchen 已提交
405

S
sunby 已提交
406
	s.cluster.Startup(datanodes)
407

408
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
409 410 411
	return nil
}

412
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
413 414 415
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
416 417
}

S
sunby 已提交
418
func (s *Server) initMeta() error {
X
Xiaofan 已提交
419 420 421 422
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.DataCoordCfg.MetaRootPath)
	s.kvClient = etcdKV
	reloadEtcdFn := func() error {
		var err error
G
godchen 已提交
423
		s.meta, err = newMeta(s.kvClient)
424 425 426 427
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
428
	}
X
Xiaofan 已提交
429
	return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
430 431
}

432 433
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
434
	s.serverLoopWg.Add(3)
435 436 437
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
438
	s.garbageCollector.start()
439 440
}

441 442
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
443 444 445
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
B
Bingyi Sun 已提交
446
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
S
sunby 已提交
447 448
		return
	}
449 450
	ttMsgStream.AsConsumerWithPosition([]string{Params.DataCoordCfg.TimeTickChannelName},
		Params.DataCoordCfg.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
B
Bingyi Sun 已提交
451
	log.Debug("DataCoord creates the timetick channel consumer",
452 453
		zap.String("timeTickChannel", Params.DataCoordCfg.TimeTickChannelName),
		zap.String("subscription", Params.DataCoordCfg.DataCoordSubscriptionName))
S
sunby 已提交
454
	ttMsgStream.Start()
455

456 457 458 459 460 461
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
462
		}
463

464 465 466 467 468 469
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
470
				log.Debug("DataNode timetick loop shutdown")
471 472
				return
			default:
473
			}
474 475
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
B
Bingyi Sun 已提交
476
				log.Debug("receive nil timetick msg and shutdown timetick channel")
477
				return
478
			}
479
			for _, msg := range msgPack.Msgs {
480 481 482
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
483 484 485 486 487
					continue
				}
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
488

489 490
				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
S
sunby 已提交
491
					continue
492
				}
S
sunby 已提交
493
			}
494
			s.helper.eventAfterHandleDataNodeTt()
495
		}
496
	}()
497 498
}

499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

	log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

593
// start a goroutine wto watch services
594
func (s *Server) startWatchService(ctx context.Context) {
595 596 597 598 599
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
600
	defer logutil.LogPanic()
601 602 603 604 605 606
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
607 608 609 610 611
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
612 613 614 615 616 617 618 619
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
620 621
		}
	}
622

623
}
S
sunby 已提交
624

625
// handles session events - DataNodes Add/Del
626
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
627
	if event == nil {
628
		return nil
629 630 631 632 633 634
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
635 636 637 638
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
639 640 641 642 643
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
644 645 646 647
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
648 649 650 651 652
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
653 654 655 656
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
657 658 659 660 661
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
662
	return nil
663 664
}

665 666
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
667
func (s *Server) startFlushLoop(ctx context.Context) {
668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
684
		}
685
	}()
S
sunby 已提交
686 687
}

688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
720 721 722 723 724 725 726 727 728 729 730
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

731
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
732
	var err error
X
Xiaofan 已提交
733
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.DataCoordCfg.MetaRootPath, s.etcdCli); err != nil {
S
sunby 已提交
734 735
		return err
	}
736
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
737 738
		return err
	}
739
	return s.rootCoordClient.Start()
S
sunby 已提交
740
}
741

742 743 744 745
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
746
func (s *Server) Stop() error {
747
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
748 749
		return nil
	}
750
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
751
	s.cluster.Close()
752
	s.garbageCollector.close()
S
sunby 已提交
753
	s.stopServerLoop()
C
congqixia 已提交
754
	s.session.Revoke(time.Second)
S
sunby 已提交
755

756
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
757 758 759
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
760 761 762
	return nil
}

S
sunby 已提交
763 764
// CleanMeta only for test
func (s *Server) CleanMeta() error {
765
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
766
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
767 768
}

S
sunby 已提交
769 770 771 772 773
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

774 775 776 777 778 779 780 781 782 783 784 785 786 787
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
788

789 790
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
791
		Base: &commonpb.MsgBase{
792
			MsgType:  commonpb.MsgType_DescribeCollection,
793
			SourceID: Params.DataCoordCfg.NodeID,
S
sunby 已提交
794 795 796 797 798 799 800
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
801
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
802 803
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
804 805
			MsgID:     0,
			Timestamp: 0,
806
			SourceID:  Params.DataCoordCfg.NodeID,
807
		},
S
sunby 已提交
808 809 810 811 812
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
813 814
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
815 816
		return err
	}
S
sunby 已提交
817
	collInfo := &datapb.CollectionInfo{
818 819 820 821
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
822
	}
S
sunby 已提交
823 824
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
825
}