server.go 24.5 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
31 32 33
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
X
Xiangyu Wang 已提交
34 35 36
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
37
	"github.com/milvus-io/milvus/internal/types"
X
Xiaofan 已提交
38
	"github.com/milvus-io/milvus/internal/util/logutil"
X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
41
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
44
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
46
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiangyu Wang 已提交
47 48
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
X
Xiaofan 已提交
49
	clientv3 "go.etcd.io/etcd/client/v3"
X
Xiangyu Wang 已提交
50
	"go.uber.org/zap"
S
sunby 已提交
51 52
)

53 54 55 56
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
57

58 59
var (
	// TODO: sunby put to config
60 61
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
X
Xiaofan 已提交
62 63
	ttMaxInterval             = 2 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes())
64
	segmentTimedFlushDuration = 10.0
65 66
)

S
sunby 已提交
67
type (
68 69 70
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
71
	Timestamp = typeutil.Timestamp
S
sunby 已提交
72
)
S
sunby 已提交
73

74
// ServerState type alias, presents datacoord Server State
75 76 77 78 79 80 81 82 83 84 85
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

86
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
X
Xiaofan 已提交
87
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoord, error)
S
sunby 已提交
88

89 90 91
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

92
var Params paramtable.ComponentParam
93

94 95
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
96
type Server struct {
S
sunby 已提交
97 98 99 100
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
101
	quitCh           chan struct{}
102
	isServing        ServerState
S
sunby 已提交
103
	helper           ServerHelper
104

X
Xiaofan 已提交
105
	etcdCli          *clientv3.Client
106 107 108 109 110 111 112 113 114 115
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
116
	handler          Handler
117

S
sunby 已提交
118 119 120
	compactionTrigger trigger
	compactionHandler compactionPlanContext

121 122
	metricsCacheManager *metricsinfo.MetricsCacheManager

123 124
	flushCh   chan UniqueID
	msFactory msgstream.Factory
125

126 127
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
128

129 130
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
131
}
S
sunby 已提交
132

133
// ServerHelper datacoord server injection helper
S
sunby 已提交
134 135 136 137 138 139 140 141 142 143
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

144
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
145 146
type Option func(svr *Server)

147
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
148
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
149 150 151 152 153
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

154
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
155 156 157 158 159 160
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

161 162 163 164 165 166 167
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

168
// SetDataNodeCreator returns an `Option` setting DataNode create function
169
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
170 171 172 173 174
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
175 176 177 178 179 180 181
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

182
// CreateServer creates a `Server` instance
X
Xiaofan 已提交
183
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) *Server {
S
sunby 已提交
184
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
185
	s := &Server{
S
sunby 已提交
186
		ctx:                    ctx,
187
		quitCh:                 make(chan struct{}),
S
sunby 已提交
188 189
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
190
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
191
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
192
		helper:                 defaultServerHelper(),
193 194

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
195
	}
S
sunby 已提交
196 197 198 199

	for _, opt := range opts {
		opt(s)
	}
X
Xiaofan 已提交
200
	return s
S
sunby 已提交
201 202
}

G
godchen 已提交
203 204
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
205 206
}

X
Xiaofan 已提交
207 208
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, client)
S
sunby 已提交
209 210
}

211 212 213 214 215
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

216
// Register registers data service at etcd
217
func (s *Server) Register() error {
218 219
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
220
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
221
		if err := s.Stop(); err != nil {
222
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
223 224
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
225 226 227
		if s.session.TriggerKill {
			syscall.Kill(syscall.Getpid(), syscall.SIGINT)
		}
228 229 230 231 232
	})
	return nil
}

func (s *Server) initSession() error {
233
	s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli)
234 235 236
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
X
Xiaofan 已提交
237
	s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true, true)
238
	Params.DataCoordCfg.NodeID = s.session.ServerID
239
	Params.SetLogger(Params.DataCoordCfg.NodeID)
240 241 242
	return nil
}

243
// Init change server state to Initializing
244
func (s *Server) Init() error {
245
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
246
	return s.initSession()
S
sunby 已提交
247 248
}

249 250 251 252 253 254 255
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
256
func (s *Server) Start() error {
257
	var err error
S
sunby 已提交
258
	m := map[string]interface{}{
259
		"PulsarAddress":  Params.PulsarCfg.Address,
S
sunby 已提交
260 261 262 263 264 265
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
266
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
267 268
		return err
	}
269

S
sunby 已提交
270 271 272
	if err = s.initMeta(); err != nil {
		return err
	}
273

274 275
	s.handler = newServerHandler(s)

S
sunby 已提交
276 277 278
	if err = s.initCluster(); err != nil {
		return err
	}
279

C
congqixia 已提交
280
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
281
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
282 283 284
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
285

286
	s.startSegmentManager()
S
sunby 已提交
287 288 289
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
290

291 292 293 294
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
295
	s.startServerLoop()
296 297
	Params.DataCoordCfg.CreatedTime = time.Now()
	Params.DataCoordCfg.UpdatedTime = time.Now()
298
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
299
	logutil.Logger(s.ctx).Debug("startup success")
300

S
sunby 已提交
301
	return nil
302 303 304
}

func (s *Server) initCluster() error {
305 306 307 308
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
309
	var err error
310
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory))
311 312
	if err != nil {
		return err
313
	}
S
sunby 已提交
314 315
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
316
	return nil
317
}
G
groot 已提交
318

X
Xiaofan 已提交
319 320 321 322 323
// SetEtcdClient sets etcd client for datacoord.
func (s *Server) SetEtcdClient(client *clientv3.Client) {
	s.etcdCli = client
}

S
sunby 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

342 343 344
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
345
	if Params.DataCoordCfg.EnableGarbageCollection {
346 347 348
		cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
			Secure: Params.MinioCfg.UseSSL,
349 350 351 352
		})
		if err != nil {
			return err
		}
353 354

		checkBucketFn := func() error {
355
			has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
356 357 358
			if err != nil {
				return err
			}
359
			if !has {
360
				err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
361 362 363 364 365 366 367 368 369 370 371 372 373
				if err != nil {
					return err
				}
			}
			return nil
		}
		// retry times shall be two, just to prevent
		// 1. bucket not exists
		// 2. bucket is created by other componnent
		// 3. datacoord try to create but failed with bucket already exists error
		err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
		if err != nil {
			return err
374 375 376 377 378
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
379
		enabled:    Params.DataCoordCfg.EnableGarbageCollection,
380 381
		bucketName: Params.MinioCfg.BucketName,
		rootPath:   Params.MinioCfg.RootPath,
382

383 384 385
		checkInterval:    Params.DataCoordCfg.GCInterval,
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance,
386 387 388 389
	})
	return nil
}

390 391 392
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
B
Bingyi Sun 已提交
393
		log.Debug("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
394 395
		return err
	}
B
Bingyi Sun 已提交
396
	log.Debug("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
397

S
sunby 已提交
398
	datanodes := make([]*NodeInfo, 0, len(sessions))
399
	for _, session := range sessions {
400 401 402
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
403
		}
404
		datanodes = append(datanodes, info)
405
	}
G
godchen 已提交
406

S
sunby 已提交
407
	s.cluster.Startup(datanodes)
408

409
	// TODO implement rewatch logic
410
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
411 412 413
	return nil
}

414
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
415 416 417
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
418 419
}

S
sunby 已提交
420
func (s *Server) initMeta() error {
421
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath)
X
Xiaofan 已提交
422 423 424
	s.kvClient = etcdKV
	reloadEtcdFn := func() error {
		var err error
G
godchen 已提交
425
		s.meta, err = newMeta(s.kvClient)
426 427 428 429
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
430
	}
X
Xiaofan 已提交
431
	return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
432 433
}

434 435
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
436
	s.serverLoopWg.Add(3)
437 438 439
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
440
	s.garbageCollector.start()
441 442
}

443 444
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
445 446 447
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
B
Bingyi Sun 已提交
448
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
S
sunby 已提交
449 450
		return
	}
451 452
	ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick},
		Params.MsgChannelCfg.DataCoordSubName, mqclient.SubscriptionPositionLatest)
B
Bingyi Sun 已提交
453
	log.Debug("DataCoord creates the timetick channel consumer",
454 455
		zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick),
		zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
S
sunby 已提交
456
	ttMsgStream.Start()
457

458
	go func() {
J
Jiquan Long 已提交
459
		var checker *timerecord.LongTermChecker
460
		if enableTtChecker {
J
Jiquan Long 已提交
461
			checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
462 463
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
464
		}
465

466 467 468 469 470 471
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
472
				log.Debug("DataNode timetick loop shutdown")
473 474
				return
			default:
475
			}
476 477
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
B
Bingyi Sun 已提交
478
				log.Debug("receive nil timetick msg and shutdown timetick channel")
479
				return
480
			}
481
			for _, msg := range msgPack.Msgs {
482 483 484
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
485 486 487 488 489
					continue
				}
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
490

491 492
				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
S
sunby 已提交
493
					continue
494
				}
S
sunby 已提交
495
			}
496
			s.helper.eventAfterHandleDataNodeTt()
497
		}
498
	}()
499 500
}

501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

	log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

595
// start a goroutine wto watch services
596
func (s *Server) startWatchService(ctx context.Context) {
597 598 599 600 601
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
602
	defer logutil.LogPanic()
603 604 605 606 607 608
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
609 610
		case event, ok := <-s.eventCh:
			if !ok {
611 612 613 614 615 616 617
				// ErrCompacted in handled inside SessionWatcher
				// So there is some other error occurred, closing DataCoord server
				logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", s.session.ServerID))
				go s.Stop()
				if s.session.TriggerKill {
					syscall.Kill(syscall.Getpid(), syscall.SIGINT)
				}
618 619
				return
			}
620 621 622 623 624 625 626 627
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
628 629 630
		}
	}
}
S
sunby 已提交
631

632
// handles session events - DataNodes Add/Del
633
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
634
	if event == nil {
635
		return nil
636 637 638 639 640 641
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
642 643 644 645
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
646 647 648 649 650
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
651 652 653 654
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
655 656 657 658 659
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
660 661 662 663
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
664 665 666 667 668
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
669
	return nil
670 671
}

672 673
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
674
func (s *Server) startFlushLoop(ctx context.Context) {
675 676 677 678 679 680 681 682 683 684
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
685
				logutil.Logger(s.ctx).Debug("flush loop shutdown")
686 687 688 689 690
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
691
		}
692
	}()
S
sunby 已提交
693 694
}

695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
727 728 729 730 731 732 733 734 735 736 737
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

738
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
739
	var err error
740
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli); err != nil {
S
sunby 已提交
741 742
		return err
	}
743
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
744 745
		return err
	}
746
	return s.rootCoordClient.Start()
S
sunby 已提交
747
}
748

749 750 751 752
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
753
func (s *Server) Stop() error {
754
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
755 756
		return nil
	}
757
	logutil.Logger(s.ctx).Debug("server shutdown")
S
sunby 已提交
758
	s.cluster.Close()
759
	s.garbageCollector.close()
S
sunby 已提交
760
	s.stopServerLoop()
C
congqixia 已提交
761
	s.session.Revoke(time.Second)
S
sunby 已提交
762

763
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
764 765 766
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
767 768 769
	return nil
}

S
sunby 已提交
770 771
// CleanMeta only for test
func (s *Server) CleanMeta() error {
772
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
773
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
774 775
}

S
sunby 已提交
776 777 778 779 780
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

781 782 783 784 785 786 787 788 789 790 791 792 793 794
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
795

796 797
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
798
		Base: &commonpb.MsgBase{
799
			MsgType:  commonpb.MsgType_DescribeCollection,
800
			SourceID: Params.DataCoordCfg.NodeID,
S
sunby 已提交
801 802 803 804 805 806 807
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
808
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
809 810
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
811 812
			MsgID:     0,
			Timestamp: 0,
813
			SourceID:  Params.DataCoordCfg.NodeID,
814
		},
S
sunby 已提交
815 816 817 818 819
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
820 821
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
822 823
		return err
	}
S
sunby 已提交
824
	collInfo := &datapb.CollectionInfo{
825 826 827 828
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
829
	}
S
sunby 已提交
830 831
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
832
}