server.go 24.2 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
31 32
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
X
Xiangyu Wang 已提交
33
	"github.com/milvus-io/milvus/internal/logutil"
X
Xiangyu Wang 已提交
34
	"github.com/milvus-io/milvus/internal/msgstream"
X
Xiangyu Wang 已提交
35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/types"
X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
41
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
44
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
46
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiangyu Wang 已提交
47 48
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
X
Xiaofan 已提交
49
	clientv3 "go.etcd.io/etcd/client/v3"
X
Xiangyu Wang 已提交
50
	"go.uber.org/zap"
S
sunby 已提交
51 52
)

53 54 55 56
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
57

58 59
var (
	// TODO: sunby put to config
60 61
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
X
Xiaofan 已提交
62 63
	ttMaxInterval             = 2 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes())
64
	segmentTimedFlushDuration = 10.0
65 66
)

S
sunby 已提交
67
type (
68 69 70
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
71
	Timestamp = typeutil.Timestamp
S
sunby 已提交
72
)
S
sunby 已提交
73

74
// ServerState type alias, presents datacoord Server State
75 76 77 78 79 80 81 82 83 84 85
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

86
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
X
Xiaofan 已提交
87
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoord, error)
S
sunby 已提交
88

89 90 91
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

92 93
var Params paramtable.GlobalParamTable

94 95
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
96
type Server struct {
S
sunby 已提交
97 98 99 100
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
101
	quitCh           chan struct{}
102
	isServing        ServerState
S
sunby 已提交
103
	helper           ServerHelper
104

X
Xiaofan 已提交
105
	etcdCli          *clientv3.Client
106 107 108 109 110 111 112 113 114 115
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
116
	handler          Handler
117

S
sunby 已提交
118 119 120
	compactionTrigger trigger
	compactionHandler compactionPlanContext

121 122
	metricsCacheManager *metricsinfo.MetricsCacheManager

123 124
	flushCh   chan UniqueID
	msFactory msgstream.Factory
125

126 127
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
128

129 130
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
131
}
S
sunby 已提交
132

133
// ServerHelper datacoord server injection helper
S
sunby 已提交
134 135 136 137 138 139 140 141 142 143
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

144
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
145 146
type Option func(svr *Server)

147
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
148
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
149 150 151 152 153
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

154
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
155 156 157 158 159 160
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

161 162 163 164 165 166 167
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

168
// SetDataNodeCreator returns an `Option` setting DataNode create function
169
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
170 171 172 173 174
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
175 176 177 178 179 180 181
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

182
// CreateServer creates a `Server` instance
X
Xiaofan 已提交
183
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) *Server {
S
sunby 已提交
184
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
185
	s := &Server{
S
sunby 已提交
186
		ctx:                    ctx,
187
		quitCh:                 make(chan struct{}),
S
sunby 已提交
188 189
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
190
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
191
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
192
		helper:                 defaultServerHelper(),
193 194

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
195
	}
S
sunby 已提交
196 197 198 199

	for _, opt := range opts {
		opt(s)
	}
X
Xiaofan 已提交
200
	return s
S
sunby 已提交
201 202
}

G
godchen 已提交
203 204
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
205 206
}

X
Xiaofan 已提交
207 208
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, client)
S
sunby 已提交
209 210
}

211 212 213 214 215
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

216
// Register registers data service at etcd
217
func (s *Server) Register() error {
218 219
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
220
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
221
		if err := s.Stop(); err != nil {
222
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
223 224
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
225 226 227
		if s.session.TriggerKill {
			syscall.Kill(syscall.Getpid(), syscall.SIGINT)
		}
228 229 230 231 232
	})
	return nil
}

func (s *Server) initSession() error {
233
	s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli)
234 235 236
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
X
Xiaofan 已提交
237
	s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true, true)
238
	Params.DataCoordCfg.NodeID = s.session.ServerID
239
	Params.SetLogger(Params.DataCoordCfg.NodeID)
240 241 242
	return nil
}

243
// Init change server state to Initializing
244
func (s *Server) Init() error {
245
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
246
	return s.initSession()
S
sunby 已提交
247 248
}

249 250 251 252 253 254 255
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
256
func (s *Server) Start() error {
257
	var err error
S
sunby 已提交
258
	m := map[string]interface{}{
259
		"PulsarAddress":  Params.PulsarCfg.Address,
S
sunby 已提交
260 261 262 263 264 265
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
266
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
267 268
		return err
	}
269

S
sunby 已提交
270 271 272
	if err = s.initMeta(); err != nil {
		return err
	}
273

274 275
	s.handler = newServerHandler(s)

S
sunby 已提交
276 277 278
	if err = s.initCluster(); err != nil {
		return err
	}
279

C
congqixia 已提交
280
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
281
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
282 283 284
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
285

286
	s.startSegmentManager()
S
sunby 已提交
287 288 289
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
290

291 292 293 294
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
295
	s.startServerLoop()
296 297
	Params.DataCoordCfg.CreatedTime = time.Now()
	Params.DataCoordCfg.UpdatedTime = time.Now()
298
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
299
	logutil.Logger(s.ctx).Debug("startup success")
300

S
sunby 已提交
301
	return nil
302 303 304
}

func (s *Server) initCluster() error {
305 306 307 308
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
309
	var err error
310
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory))
311 312
	if err != nil {
		return err
313
	}
S
sunby 已提交
314 315
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
316
	return nil
317
}
G
groot 已提交
318

X
Xiaofan 已提交
319 320 321 322 323
// SetEtcdClient sets etcd client for datacoord.
func (s *Server) SetEtcdClient(client *clientv3.Client) {
	s.etcdCli = client
}

S
sunby 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

342 343 344
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
345
	if Params.DataCoordCfg.EnableGarbageCollection {
346 347 348
		cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
			Secure: Params.MinioCfg.UseSSL,
349 350 351 352
		})
		if err != nil {
			return err
		}
353 354

		checkBucketFn := func() error {
355
			has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
356 357 358
			if err != nil {
				return err
			}
359
			if !has {
360
				err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
361 362 363 364 365 366 367 368 369 370 371 372 373
				if err != nil {
					return err
				}
			}
			return nil
		}
		// retry times shall be two, just to prevent
		// 1. bucket not exists
		// 2. bucket is created by other componnent
		// 3. datacoord try to create but failed with bucket already exists error
		err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
		if err != nil {
			return err
374 375 376 377 378
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
379
		enabled:    Params.DataCoordCfg.EnableGarbageCollection,
380 381
		bucketName: Params.MinioCfg.BucketName,
		rootPath:   Params.MinioCfg.RootPath,
382

383 384 385
		checkInterval:    Params.DataCoordCfg.GCInterval,
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance,
386 387 388 389
	})
	return nil
}

390 391 392
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
B
Bingyi Sun 已提交
393
		log.Debug("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
394 395
		return err
	}
B
Bingyi Sun 已提交
396
	log.Debug("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
397

S
sunby 已提交
398
	datanodes := make([]*NodeInfo, 0, len(sessions))
399
	for _, session := range sessions {
400 401 402
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
403
		}
404
		datanodes = append(datanodes, info)
405
	}
G
godchen 已提交
406

S
sunby 已提交
407
	s.cluster.Startup(datanodes)
408

409
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
410 411 412
	return nil
}

413
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
414 415 416
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
417 418
}

S
sunby 已提交
419
func (s *Server) initMeta() error {
420
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath)
X
Xiaofan 已提交
421 422 423
	s.kvClient = etcdKV
	reloadEtcdFn := func() error {
		var err error
G
godchen 已提交
424
		s.meta, err = newMeta(s.kvClient)
425 426 427 428
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
429
	}
X
Xiaofan 已提交
430
	return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
431 432
}

433 434
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
435
	s.serverLoopWg.Add(3)
436 437 438
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
439
	s.garbageCollector.start()
440 441
}

442 443
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
444 445 446
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
B
Bingyi Sun 已提交
447
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
S
sunby 已提交
448 449
		return
	}
450 451
	ttMsgStream.AsConsumerWithPosition([]string{Params.MsgChannelCfg.DataCoordTimeTick},
		Params.MsgChannelCfg.DataCoordSubName, mqclient.SubscriptionPositionLatest)
B
Bingyi Sun 已提交
452
	log.Debug("DataCoord creates the timetick channel consumer",
453 454
		zap.String("timeTickChannel", Params.MsgChannelCfg.DataCoordTimeTick),
		zap.String("subscription", Params.MsgChannelCfg.DataCoordSubName))
S
sunby 已提交
455
	ttMsgStream.Start()
456

457
	go func() {
J
Jiquan Long 已提交
458
		var checker *timerecord.LongTermChecker
459
		if enableTtChecker {
J
Jiquan Long 已提交
460
			checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
461 462
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
463
		}
464

465 466 467 468 469 470
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
471
				log.Debug("DataNode timetick loop shutdown")
472 473
				return
			default:
474
			}
475 476
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
B
Bingyi Sun 已提交
477
				log.Debug("receive nil timetick msg and shutdown timetick channel")
478
				return
479
			}
480
			for _, msg := range msgPack.Msgs {
481 482 483
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
484 485 486 487 488
					continue
				}
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
489

490 491
				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
S
sunby 已提交
492
					continue
493
				}
S
sunby 已提交
494
			}
495
			s.helper.eventAfterHandleDataNodeTt()
496
		}
497
	}()
498 499
}

500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

	log.Debug("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

594
// start a goroutine wto watch services
595
func (s *Server) startWatchService(ctx context.Context) {
596 597 598 599 600
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
601
	defer logutil.LogPanic()
602 603 604 605 606 607
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
608 609 610 611 612
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
613 614 615 616 617 618 619 620
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
621 622
		}
	}
623

624
}
S
sunby 已提交
625

626
// handles session events - DataNodes Add/Del
627
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
628
	if event == nil {
629
		return nil
630 631 632 633 634 635
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
636 637 638 639
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
640 641 642 643 644
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
645 646 647 648
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
649 650 651 652 653
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
654 655 656 657
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
658 659 660 661 662
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
663
	return nil
664 665
}

666 667
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
668
func (s *Server) startFlushLoop(ctx context.Context) {
669 670 671 672 673 674 675 676 677 678
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
679
				logutil.Logger(s.ctx).Debug("flush loop shutdown")
680 681 682 683 684
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
685
		}
686
	}()
S
sunby 已提交
687 688
}

689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
721 722 723 724 725 726 727 728 729 730 731
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

732
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
733
	var err error
734
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli); err != nil {
S
sunby 已提交
735 736
		return err
	}
737
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
738 739
		return err
	}
740
	return s.rootCoordClient.Start()
S
sunby 已提交
741
}
742

743 744 745 746
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
747
func (s *Server) Stop() error {
748
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
749 750
		return nil
	}
751
	logutil.Logger(s.ctx).Debug("server shutdown")
S
sunby 已提交
752
	s.cluster.Close()
753
	s.garbageCollector.close()
S
sunby 已提交
754
	s.stopServerLoop()
C
congqixia 已提交
755
	s.session.Revoke(time.Second)
S
sunby 已提交
756

757
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
758 759 760
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
761 762 763
	return nil
}

S
sunby 已提交
764 765
// CleanMeta only for test
func (s *Server) CleanMeta() error {
766
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
767
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
768 769
}

S
sunby 已提交
770 771 772 773 774
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

775 776 777 778 779 780 781 782 783 784 785 786 787 788
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
789

790 791
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
792
		Base: &commonpb.MsgBase{
793
			MsgType:  commonpb.MsgType_DescribeCollection,
794
			SourceID: Params.DataCoordCfg.NodeID,
S
sunby 已提交
795 796 797 798 799 800 801
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
802
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
803 804
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
805 806
			MsgID:     0,
			Timestamp: 0,
807
			SourceID:  Params.DataCoordCfg.NodeID,
808
		},
S
sunby 已提交
809 810 811 812 813
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
814 815
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
816 817
		return err
	}
S
sunby 已提交
818
	collInfo := &datapb.CollectionInfo{
819 820 821 822
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
823
	}
S
sunby 已提交
824 825
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
826
}