server.go 25.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

S
sunby 已提交
29
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
30
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
X
Xiangyu Wang 已提交
31 32
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
J
jaime 已提交
33 34
	"github.com/milvus-io/milvus/internal/mq/msgstream"
	"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
X
Xiangyu Wang 已提交
35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/types"
X
Xiaofan 已提交
39
	"github.com/milvus-io/milvus/internal/util/logutil"
X
Xiangyu Wang 已提交
40
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
41
	"github.com/milvus-io/milvus/internal/util/paramtable"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
43
	"github.com/milvus-io/milvus/internal/util/sessionutil"
G
godchen 已提交
44
	"github.com/milvus-io/milvus/internal/util/timerecord"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/tsoutil"
X
Xiangyu Wang 已提交
46
	"github.com/milvus-io/milvus/internal/util/typeutil"
X
Xiangyu Wang 已提交
47 48
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
X
Xiaofan 已提交
49
	clientv3 "go.etcd.io/etcd/client/v3"
X
Xiangyu Wang 已提交
50
	"go.uber.org/zap"
S
sunby 已提交
51 52
)

53 54 55 56
const (
	connEtcdMaxRetryTime = 100000
	allPartitionID       = 0 // paritionID means no filtering
)
S
sunby 已提交
57

58 59
var (
	// TODO: sunby put to config
60 61
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
X
Xiaofan 已提交
62 63
	ttMaxInterval             = 2 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes())
64
	segmentTimedFlushDuration = 10.0
65 66
)

S
sunby 已提交
67
type (
68 69 70
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
71
	Timestamp = typeutil.Timestamp
S
sunby 已提交
72
)
S
sunby 已提交
73

74
// ServerState type alias, presents datacoord Server State
75 76 77 78 79 80 81 82 83 84 85
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

86
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
X
Xiaofan 已提交
87
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoord, error)
S
sunby 已提交
88

89 90 91
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

92
var Params paramtable.ComponentParam
93

94 95
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
96
type Server struct {
S
sunby 已提交
97 98 99 100
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
101
	quitCh           chan struct{}
102
	isServing        ServerState
S
sunby 已提交
103
	helper           ServerHelper
104

X
Xiaofan 已提交
105
	etcdCli          *clientv3.Client
106 107 108 109 110 111 112 113 114 115
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
116
	handler          Handler
117

S
sunby 已提交
118 119 120
	compactionTrigger trigger
	compactionHandler compactionPlanContext

121 122
	metricsCacheManager *metricsinfo.MetricsCacheManager

123 124
	flushCh   chan UniqueID
	msFactory msgstream.Factory
125

126 127
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
128

129 130
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
131
}
S
sunby 已提交
132

133
// ServerHelper datacoord server injection helper
S
sunby 已提交
134 135 136 137 138 139 140 141 142 143
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

144
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
145 146
type Option func(svr *Server)

147
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
148
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
149 150 151 152 153
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

154
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
155 156 157 158 159 160
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

161 162 163 164 165 166 167
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

168
// SetDataNodeCreator returns an `Option` setting DataNode create function
169
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
170 171 172 173 174
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
175 176 177 178 179 180 181
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

182
// CreateServer creates a `Server` instance
X
Xiaofan 已提交
183
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) *Server {
S
sunby 已提交
184
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
185
	s := &Server{
S
sunby 已提交
186
		ctx:                    ctx,
187
		quitCh:                 make(chan struct{}),
S
sunby 已提交
188 189
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
190
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
191
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
192
		helper:                 defaultServerHelper(),
193 194

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
195
	}
S
sunby 已提交
196 197 198 199

	for _, opt := range opts {
		opt(s)
	}
X
Xiaofan 已提交
200
	return s
S
sunby 已提交
201 202
}

G
godchen 已提交
203 204
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
205 206
}

X
Xiaofan 已提交
207 208
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, client)
S
sunby 已提交
209 210
}

211 212 213 214 215
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

216
// Register registers data service at etcd
217
func (s *Server) Register() error {
218 219
	s.session.Register()
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
220
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
221
		if err := s.Stop(); err != nil {
222
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
223 224
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
225 226 227
		if s.session.TriggerKill {
			syscall.Kill(syscall.Getpid(), syscall.SIGINT)
		}
228 229 230 231 232
	})
	return nil
}

func (s *Server) initSession() error {
233
	s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli)
234 235 236
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
X
Xiaofan 已提交
237
	s.session.Init(typeutil.DataCoordRole, Params.DataCoordCfg.Address, true, true)
238
	Params.DataCoordCfg.NodeID = s.session.ServerID
239
	Params.SetLogger(Params.DataCoordCfg.NodeID)
240 241 242
	return nil
}

243
// Init change server state to Initializing
244
func (s *Server) Init() error {
245
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
246
	return s.initSession()
S
sunby 已提交
247 248
}

249 250 251 252 253 254 255
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
256
func (s *Server) Start() error {
257
	var err error
S
sunby 已提交
258
	m := map[string]interface{}{
259
		"PulsarAddress":  Params.PulsarCfg.Address,
S
sunby 已提交
260 261 262 263 264 265
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
266
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
267 268
		return err
	}
269

S
sunby 已提交
270 271 272
	if err = s.initMeta(); err != nil {
		return err
	}
273

274 275
	s.handler = newServerHandler(s)

S
sunby 已提交
276 277 278
	if err = s.initCluster(); err != nil {
		return err
	}
279

C
congqixia 已提交
280
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
281
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
282 283 284
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
285

286
	s.startSegmentManager()
S
sunby 已提交
287 288 289
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
290

291 292 293 294
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
295
	s.startServerLoop()
296 297
	Params.DataCoordCfg.CreatedTime = time.Now()
	Params.DataCoordCfg.UpdatedTime = time.Now()
298
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
299
	logutil.Logger(s.ctx).Debug("startup success")
300

S
sunby 已提交
301
	return nil
302 303 304
}

func (s *Server) initCluster() error {
305 306 307 308
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
309
	var err error
310
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.msFactory))
311 312
	if err != nil {
		return err
313
	}
S
sunby 已提交
314 315
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
316
	return nil
317
}
G
groot 已提交
318

X
Xiaofan 已提交
319 320 321 322 323
// SetEtcdClient sets etcd client for datacoord.
func (s *Server) SetEtcdClient(client *clientv3.Client) {
	s.etcdCli = client
}

S
sunby 已提交
324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

342 343 344
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
345
	if Params.DataCoordCfg.EnableGarbageCollection {
346 347 348
		cli, err = minio.New(Params.MinioCfg.Address, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioCfg.AccessKeyID, Params.MinioCfg.SecretAccessKey, ""),
			Secure: Params.MinioCfg.UseSSL,
349 350 351 352
		})
		if err != nil {
			return err
		}
353 354

		checkBucketFn := func() error {
355
			has, err := cli.BucketExists(context.TODO(), Params.MinioCfg.BucketName)
356 357 358
			if err != nil {
				return err
			}
359
			if !has {
360
				err = cli.MakeBucket(context.TODO(), Params.MinioCfg.BucketName, minio.MakeBucketOptions{})
361 362 363 364 365 366 367 368 369 370 371 372 373
				if err != nil {
					return err
				}
			}
			return nil
		}
		// retry times shall be two, just to prevent
		// 1. bucket not exists
		// 2. bucket is created by other componnent
		// 3. datacoord try to create but failed with bucket already exists error
		err = retry.Do(s.ctx, checkBucketFn, retry.Attempts(2))
		if err != nil {
			return err
374 375 376 377 378
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
379
		enabled:    Params.DataCoordCfg.EnableGarbageCollection,
380 381
		bucketName: Params.MinioCfg.BucketName,
		rootPath:   Params.MinioCfg.RootPath,
382

383 384 385
		checkInterval:    Params.DataCoordCfg.GCInterval,
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance,
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance,
386 387 388 389
	})
	return nil
}

390 391 392
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
X
Xiaofan 已提交
393
		log.Warn("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
394 395
		return err
	}
X
Xiaofan 已提交
396
	log.Info("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
397

S
sunby 已提交
398
	datanodes := make([]*NodeInfo, 0, len(sessions))
399
	for _, session := range sessions {
400 401 402
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
403
		}
404
		datanodes = append(datanodes, info)
405
	}
G
godchen 已提交
406

S
sunby 已提交
407
	s.cluster.Startup(datanodes)
408

409
	// TODO implement rewatch logic
410
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1, nil)
S
sunby 已提交
411 412 413
	return nil
}

414
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
415 416 417
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
418 419
}

S
sunby 已提交
420
func (s *Server) initMeta() error {
421
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath)
X
Xiaofan 已提交
422 423 424
	s.kvClient = etcdKV
	reloadEtcdFn := func() error {
		var err error
G
godchen 已提交
425
		s.meta, err = newMeta(s.kvClient)
426 427 428 429
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
430
	}
X
Xiaofan 已提交
431
	return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
432 433
}

434 435
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
436
	s.serverLoopWg.Add(3)
437 438 439
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
440
	s.garbageCollector.start()
441 442
}

443 444
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
445 446 447
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
B
Bingyi Sun 已提交
448
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
S
sunby 已提交
449 450
		return
	}
451 452
	ttMsgStream.AsConsumerWithPosition([]string{Params.CommonCfg.DataCoordTimeTick},
		Params.CommonCfg.DataCoordSubName, mqwrapper.SubscriptionPositionLatest)
X
Xiaofan 已提交
453
	log.Info("DataCoord creates the timetick channel consumer",
454 455
		zap.String("timeTickChannel", Params.CommonCfg.DataCoordTimeTick),
		zap.String("subscription", Params.CommonCfg.DataCoordSubName))
S
sunby 已提交
456
	ttMsgStream.Start()
457

458 459 460 461 462 463 464 465 466 467
	go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
}

func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) {
	var checker *timerecord.LongTermChecker
	if enableTtChecker {
		checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
		defer checker.Stop()
	}
468

469 470 471 472 473 474 475 476
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	defer func() {
		// https://github.com/milvus-io/milvus/issues/15659
		// msgstream service closed before datacoord quits
		defer func() {
			if x := recover(); x != nil {
				log.Error("Failed to close ttMessage", zap.Any("recovered", x))
477
			}
478 479 480 481 482 483
		}()
		ttMsgStream.Close()
	}()
	for {
		select {
		case <-ctx.Done():
X
Xiaofan 已提交
484
			log.Info("DataNode timetick loop shutdown")
485 486 487 488 489
			return
		default:
		}
		msgPack := ttMsgStream.Consume()
		if msgPack == nil {
X
Xiaofan 已提交
490
			log.Info("receive nil timetick msg and shutdown timetick channel")
491 492 493 494 495 496 497 498 499 500
			return
		}
		for _, msg := range msgPack.Msgs {
			ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
			if !ok {
				log.Warn("receive unexpected msg type from tt channel")
				continue
			}
			if enableTtChecker {
				checker.Check()
501
			}
S
sunby 已提交
502

503 504 505
			if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
				log.Error("failed to handle timetick message", zap.Error(err))
				continue
S
sunby 已提交
506
			}
507
		}
508 509
		s.helper.eventAfterHandleDataNodeTt()
	}
510 511
}

512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}

	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

	staleSegments := s.getStaleSegmentsInfo(ch)
	staleSegments = s.filterWithFlushableSegments(staleSegments, flushableIDs)

	if len(flushableSegments)+len(staleSegments) == 0 {
		return nil
	}

X
Xiaofan 已提交
540
	log.Info("flush segments", zap.Int64s("segmentIDs", flushableIDs), zap.Int("markSegments count", len(staleSegments)))
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605

	s.setLastFlushTime(flushableSegments)
	s.setLastFlushTime(staleSegments)

	finfo, minfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments)), make([]*datapb.SegmentInfo, 0, len(staleSegments))
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
	for _, info := range staleSegments {
		minfo = append(minfo, info.SegmentInfo)
	}
	s.cluster.Flush(s.ctx, finfo, minfo)
	return nil
}

func (s *Server) updateSegmentStatistics(stats []*datapb.SegmentStats) {
	for _, stat := range stats {
		s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
		sinfo := s.meta.GetSegment(id)
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) getStaleSegmentsInfo(ch string) []*SegmentInfo {
	return s.meta.SelectSegments(func(info *SegmentInfo) bool {
		return isSegmentHealthy(info) &&
			info.GetInsertChannel() == ch &&
			!info.lastFlushTime.IsZero() &&
			time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
	})
}

func (s *Server) filterWithFlushableSegments(staleSegments []*SegmentInfo, flushableIDs []int64) []*SegmentInfo {
	filter := map[int64]struct{}{}
	for _, sid := range flushableIDs {
		filter[sid] = struct{}{}
	}

	res := make([]*SegmentInfo, 0, len(staleSegments))
	for _, sinfo := range staleSegments {
		if _, ok := filter[sinfo.GetID()]; ok {
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

606
// start a goroutine wto watch services
607
func (s *Server) startWatchService(ctx context.Context) {
608 609 610
	go s.watchService(ctx)
}

611
// watchService watches services.
612
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
613
	defer logutil.LogPanic()
614 615 616 617
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
X
Xiaofan 已提交
618
			log.Info("watch service shutdown")
619
			return
620 621
		case event, ok := <-s.eventCh:
			if !ok {
622 623 624 625 626 627 628
				// ErrCompacted in handled inside SessionWatcher
				// So there is some other error occurred, closing DataCoord server
				logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", s.session.ServerID))
				go s.Stop()
				if s.session.TriggerKill {
					syscall.Kill(syscall.Getpid(), syscall.SIGINT)
				}
629 630
				return
			}
631 632 633 634 635 636 637 638
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
639 640 641
		}
	}
}
S
sunby 已提交
642

643
// handles session events - DataNodes Add/Del
644
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
645
	if event == nil {
646
		return nil
647 648 649 650 651 652
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
653 654 655 656
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
657 658 659 660 661
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
662 663 664 665
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
666 667 668 669 670
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
671 672 673 674
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
675 676 677 678 679
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
680
	return nil
681 682
}

683 684
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
685
func (s *Server) startFlushLoop(ctx context.Context) {
686 687 688 689 690 691 692 693 694 695
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
B
Bingyi Sun 已提交
696
				logutil.Logger(s.ctx).Debug("flush loop shutdown")
697 698 699 700 701
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
702
		}
703
	}()
S
sunby 已提交
704 705
}

706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
X
Xiaofan 已提交
733
	log.Info("flush segment complete", zap.Int64("id", segmentID))
734 735 736 737
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
738 739 740 741 742 743 744 745 746 747 748
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

749
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
750
	var err error
751
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath, s.etcdCli); err != nil {
S
sunby 已提交
752 753
		return err
	}
754
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
755 756
		return err
	}
757
	return s.rootCoordClient.Start()
S
sunby 已提交
758
}
759

760 761 762 763
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
764
func (s *Server) Stop() error {
765
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
766 767
		return nil
	}
768
	logutil.Logger(s.ctx).Debug("server shutdown")
S
sunby 已提交
769
	s.cluster.Close()
770
	s.garbageCollector.close()
S
sunby 已提交
771
	s.stopServerLoop()
C
congqixia 已提交
772
	s.session.Revoke(time.Second)
S
sunby 已提交
773

774
	if Params.DataCoordCfg.EnableCompaction {
S
sunby 已提交
775 776 777
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
778 779 780
	return nil
}

S
sunby 已提交
781 782
// CleanMeta only for test
func (s *Server) CleanMeta() error {
783
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
784
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
785 786
}

S
sunby 已提交
787 788 789 790 791
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

792 793 794 795 796 797 798 799 800 801 802 803 804 805
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
806

807 808
// loadCollectionFromRootCoord communicates with RootCoord and asks for collection information.
// collection information will be added to server meta info.
809 810
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
811
		Base: &commonpb.MsgBase{
812
			MsgType:  commonpb.MsgType_DescribeCollection,
813
			SourceID: Params.DataCoordCfg.NodeID,
S
sunby 已提交
814 815 816 817 818 819 820
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
821
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
822 823
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
824 825
			MsgID:     0,
			Timestamp: 0,
826
			SourceID:  Params.DataCoordCfg.NodeID,
827
		},
S
sunby 已提交
828 829 830 831 832
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
833 834
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
835 836
		return err
	}
S
sunby 已提交
837
	collInfo := &datapb.CollectionInfo{
838 839 840 841
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
842
	}
S
sunby 已提交
843 844
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
845
}