server.go 26.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25 26 27
	"sync/atomic"
	"time"

S
sunby 已提交
28
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
29
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
30
	"github.com/milvus-io/milvus/internal/logutil"
31 32 33 34
	"github.com/milvus-io/milvus/internal/rootcoord"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
35 36
	"github.com/minio/minio-go/v7"
	"github.com/minio/minio-go/v7/pkg/credentials"
37
	"go.uber.org/zap"
S
sunby 已提交
38

X
Xiangyu Wang 已提交
39 40 41 42 43
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
44
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
45
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
46

X
Xiangyu Wang 已提交
47 48
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
49
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
50
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
51 52
)

S
sunby 已提交
53
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
54

55 56
var (
	// TODO: sunby put to config
57 58 59 60 61
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
62 63
)

S
sunby 已提交
64
type (
65 66 67
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
68
	Timestamp = typeutil.Timestamp
S
sunby 已提交
69
)
S
sunby 已提交
70

71
// ServerState type alias, presents datacoord Server State
72 73 74 75 76 77 78 79 80 81 82
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

83 84
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
85

86 87 88
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

89 90 91
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

92 93
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
94
type Server struct {
S
sunby 已提交
95 96 97 98
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
99
	isServing        ServerState
S
sunby 已提交
100
	helper           ServerHelper
101

102 103 104 105 106 107 108 109 110 111
	kvClient         *etcdkv.EtcdKV
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
112

S
sunby 已提交
113 114 115
	compactionTrigger trigger
	compactionHandler compactionPlanContext

116 117
	metricsCacheManager *metricsinfo.MetricsCacheManager

118 119
	flushCh   chan UniqueID
	msFactory msgstream.Factory
120

121 122
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
123

124 125
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
126
}
S
sunby 已提交
127

128
// ServerHelper datacoord server injection helper
S
sunby 已提交
129 130 131 132 133 134 135 136 137 138
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

139
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
140 141
type Option func(svr *Server)

142
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
143
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
144 145 146 147 148
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

149
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
150 151 152 153 154 155
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

156 157 158 159 160 161 162
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

163
// SetDataNodeCreator returns an `Option` setting DataNode create function
164
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
165 166 167 168 169
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

B
Bingyi Sun 已提交
170 171 172 173 174 175 176
// SetSegmentManager returns an Option to set SegmentManager
func SetSegmentManager(manager Manager) Option {
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

177
// CreateServer create `Server` instance
S
sunby 已提交
178
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
179
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
180
	s := &Server{
S
sunby 已提交
181 182 183
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
184
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
185
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
186
		helper:                 defaultServerHelper(),
187 188

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
189
	}
S
sunby 已提交
190 191 192 193

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
194 195 196
	return s, nil
}

G
godchen 已提交
197 198
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
199 200
}

G
godchen 已提交
201 202
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
203 204
}

205 206
// Register register data service at etcd
func (s *Server) Register() error {
207
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
208 209 210
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
211
	s.session.Init(typeutil.DataCoordRole, Params.Address, true)
212
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
213
	Params.SetLogger(typeutil.UniqueID(-1))
214 215 216
	return nil
}

217
// Init change server state to Initializing
218
func (s *Server) Init() error {
219
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
220 221 222
	return nil
}

223 224 225 226 227 228 229
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
230
func (s *Server) Start() error {
231
	var err error
S
sunby 已提交
232 233 234 235 236 237 238 239
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
240
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
241 242
		return err
	}
243

S
sunby 已提交
244 245 246
	if err = s.initMeta(); err != nil {
		return err
	}
247

S
sunby 已提交
248 249 250
	if err = s.initCluster(); err != nil {
		return err
	}
251

C
congqixia 已提交
252
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
S
sunby 已提交
253 254 255 256
	if Params.EnableCompaction {
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
257

258
	s.startSegmentManager()
S
sunby 已提交
259 260 261
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
262

263 264 265 266
	if err = s.initGarbageCollection(); err != nil {
		return err
	}

S
sunby 已提交
267
	s.startServerLoop()
268 269
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
270
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
271
	log.Debug("dataCoordinator startup success")
272

S
sunby 已提交
273
	return nil
274 275 276
}

func (s *Server) initCluster() error {
277 278 279 280
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
281
	var err error
282 283 284
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
285
	}
S
sunby 已提交
286 287
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
288
	return nil
289
}
G
groot 已提交
290

S
sunby 已提交
291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344
func (s *Server) initGarbageCollection() error {
	var cli *minio.Client
	var err error
	if Params.EnableGarbageCollection {
		cli, err = minio.New(Params.MinioAddress, &minio.Options{
			Creds:  credentials.NewStaticV4(Params.MinioAccessKeyID, Params.MinioSecretAccessKey, ""),
			Secure: Params.MinioUseSSL,
		})
		if err != nil {
			return err
		}
		has, err := cli.BucketExists(context.TODO(), Params.MinioBucketName)
		if err != nil {
			return err
		}
		if !has {
			err = cli.MakeBucket(context.TODO(), Params.MinioBucketName, minio.MakeBucketOptions{})
			if err != nil {
				return err
			}
		}
	}

	s.garbageCollector = newGarbageCollector(s.meta, GcOption{
		cli:        cli,
		enabled:    Params.EnableGarbageCollection,
		bucketName: Params.MinioBucketName,
		rootPath:   Params.MinioRootPath,

		checkInterval:    defaultGcInterval,
		missingTolerance: defaultMissingTolerance,
		dropTolerance:    defaultMissingTolerance,
	})
	return nil
}

345 346 347
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
348
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
349 350
		return err
	}
351 352
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
353
	datanodes := make([]*NodeInfo, 0, len(sessions))
354
	for _, session := range sessions {
355 356 357
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
358
		}
359
		datanodes = append(datanodes, info)
360
	}
G
godchen 已提交
361

S
sunby 已提交
362
	s.cluster.Startup(datanodes)
363

364
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
365 366 367
	return nil
}

368
func (s *Server) startSegmentManager() {
B
Bingyi Sun 已提交
369 370 371
	if s.segmentManager == nil {
		s.segmentManager = newSegmentManager(s.meta, s.allocator)
	}
372 373
}

S
sunby 已提交
374
func (s *Server) initMeta() error {
375
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
376
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
377 378 379
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
380 381

		s.kvClient = etcdKV
G
godchen 已提交
382
		s.meta, err = newMeta(s.kvClient)
383 384 385 386
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
387
	}
G
godchen 已提交
388
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
389 390
}

391 392
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
393
	s.serverLoopWg.Add(4)
394 395 396 397
	s.startStatsChannel(s.serverLoopCtx)
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
398
	s.garbageCollector.start()
399
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
X
Xiaofan 已提交
400 401 402 403
		log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
		if err := s.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
404
	})
405 406 407
}

func (s *Server) startStatsChannel(ctx context.Context) {
G
groot 已提交
408
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
409
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
410
	log.Debug("dataCoord create stats channel consumer",
411
		zap.String("channelName", Params.StatisticsChannelName),
412
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
413
	statsStream.Start()
414 415 416 417 418 419 420 421 422 423 424 425 426 427 428
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer statsStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("stats channel shutdown")
				return
			default:
			}
			msgPack := statsStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil stats msg, shutdown stats channel")
				return
S
sunby 已提交
429
			}
430 431 432 433 434 435 436 437 438 439
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_SegmentStatistics {
					log.Warn("receive unknown msg from segment statistics channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
				for _, stat := range ssMsg.SegStats {
					s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
				}
440
			}
441
		}
442
	}()
443 444
}

445 446
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
447 448 449 450 451 452
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
453 454
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
455 456 457
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
458
	ttMsgStream.Start()
459

460 461 462 463 464 465
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
466
		}
467 468 469 470 471 472 473 474 475
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("data node tt loop shutdown")
				return
			default:
476
			}
477 478 479 480
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil tt msg, shutdown tt channel")
				return
481
			}
482 483 484 485 486 487 488 489 490 491
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_DataNodeTt {
					log.Warn("receive unexpected msg type from tt channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ttMsg := msg.(*msgstream.DataNodeTtMsg)
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
492

493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508
				ch := ttMsg.ChannelName
				ts := ttMsg.Timestamp
				if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
					log.Warn("failed to expire allocations", zap.Error(err))
					continue
				}
				physical, _ := tsoutil.ParseTS(ts)
				if time.Since(physical).Minutes() > 1 {
					// if lag behind, log every 1 mins about
					log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
				}
				segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
				if err != nil {
					log.Warn("get flushable segments failed", zap.Error(err))
					continue
				}
509

510
				staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
B
Bingyi Sun 已提交
511 512
					return isSegmentHealthy(info) &&
						info.GetInsertChannel() == ch &&
S
sunby 已提交
513 514
						!info.lastFlushTime.IsZero() &&
						time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
515
				})
516

517
				if len(segments)+len(staleSegments) == 0 {
S
sunby 已提交
518
					continue
519
				}
520 521 522 523 524 525 526
				log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
				segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
				for _, id := range segments {
					sInfo := s.meta.GetSegment(id)
					if sInfo == nil {
						log.Error("get segment from meta error", zap.Int64("id", id),
							zap.Error(err))
527 528
						continue
					}
529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544
					segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
					s.meta.SetLastFlushTime(id, time.Now())
				}
				markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
				for _, segment := range staleSegments {
					for _, fSeg := range segmentInfos {
						// check segment needs flush first
						if segment.GetID() == fSeg.GetID() {
							continue
						}
					}
					markSegments = append(markSegments, segment.SegmentInfo)
					s.meta.SetLastFlushTime(segment.GetID(), time.Now())
				}
				if len(segmentInfos)+len(markSegments) > 0 {
					s.cluster.Flush(s.ctx, segmentInfos, markSegments)
545
				}
S
sunby 已提交
546
			}
547
			s.helper.eventAfterHandleDataNodeTt()
548
		}
549
	}()
550 551
}

552
// start a goroutine wto watch services
553
func (s *Server) startWatchService(ctx context.Context) {
554 555 556 557 558
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
559
	defer logutil.LogPanic()
560 561 562 563 564 565
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
566 567 568 569 570
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
571 572 573 574 575 576 577 578
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
579 580
		}
	}
581

582
}
S
sunby 已提交
583

584
// handles session events - DataNodes Add/Del
585
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
586
	if event == nil {
587
		return nil
588 589 590 591 592 593
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
594 595 596 597
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
598 599 600 601 602
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
603 604 605 606
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
607 608 609 610 611
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
612 613 614 615
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
616 617 618 619 620
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
621
	return nil
622 623
}

S
sunby 已提交
624
func (s *Server) startFlushLoop(ctx context.Context) {
625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
641
		}
642
	}()
S
sunby 已提交
643 644
}

645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
677 678 679 680 681 682 683 684 685 686 687
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

688
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
689
	var err error
G
godchen 已提交
690
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
691 692
		return err
	}
693
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
694 695
		return err
	}
696
	return s.rootCoordClient.Start()
S
sunby 已提交
697
}
698

699 700 701 702
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
703
func (s *Server) Stop() error {
704
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
705 706
		return nil
	}
707
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
708
	s.cluster.Close()
709
	s.garbageCollector.close()
S
sunby 已提交
710
	s.stopServerLoop()
S
sunby 已提交
711 712 713 714 715

	if Params.EnableCompaction {
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
716 717 718
	return nil
}

S
sunby 已提交
719 720
// CleanMeta only for test
func (s *Server) CleanMeta() error {
721
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
722
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
723 724
}

S
sunby 已提交
725 726 727 728 729
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

730 731 732 733 734 735 736 737 738 739 740 741 742 743
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
744

745 746
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
747
		Base: &commonpb.MsgBase{
748
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
749 750 751 752 753 754 755 756
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
757
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
758 759
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
760 761
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
762
			SourceID:  Params.NodeID,
763
		},
S
sunby 已提交
764 765 766 767 768
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
769 770
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
771 772
		return err
	}
S
sunby 已提交
773
	collInfo := &datapb.CollectionInfo{
774 775 776 777
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
778
	}
S
sunby 已提交
779 780
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
781 782
}

S
sunby 已提交
783
// GetVChanPositions get vchannel latest postitions with provided dml channel names
784 785
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
786 787 788 789 790 791
	log.Debug("GetSegmentsByChannel",
		zap.Any("collectionID", collectionID),
		zap.Any("channel", channel),
		zap.Any("seekFromStartPosition", seekFromStartPosition),
		zap.Any("numOfSegments", len(segments)),
	)
792 793 794 795 796
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
797
			flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
798
			if seekPosition == nil || (s.DmlPosition.Timestamp < seekPosition.Timestamp) {
799
				seekPosition = s.DmlPosition
S
sunby 已提交
800
			}
801 802
			continue
		}
803

804
		if s.DmlPosition == nil { // segment position all nil
805 806
			continue
		}
S
sunby 已提交
807

808
		unflushed = append(unflushed, s.SegmentInfo)
S
sunby 已提交
809

810 811 812 813 814 815 816 817 818
		segmentPosition := s.DmlPosition
		if seekFromStartPosition {
			// need to use start position when load collection/partition, querynode does not support seek from checkpoint yet
			// TODO silverxia remove seek from start logic after checkpoint supported in querynode
			segmentPosition = s.StartPosition
		}

		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
819
		}
820 821 822
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
823 824 825
		collection := s.GetCollection(s.ctx, collectionID)
		if collection != nil {
			seekPosition = getCollectionStartPosition(channel, collection)
826
		}
827
	}
828

829 830 831 832 833 834
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
835
	}
836
}
837

838 839 840 841 842 843 844 845 846 847 848 849 850
func getCollectionStartPosition(channel string, collectionInfo *datapb.CollectionInfo) *internalpb.MsgPosition {
	for _, sp := range collectionInfo.GetStartPositions() {
		if sp.GetKey() != rootcoord.ToPhysicalChannel(channel) {
			continue
		}
		return &internalpb.MsgPosition{
			ChannelName: channel,
			MsgID:       sp.GetData(),
		}
	}
	return nil
}

851 852 853 854 855 856 857 858 859 860 861 862 863 864 865
// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}
866 867 868 869 870 871 872 873 874 875 876 877 878

func (s *Server) GetCollection(ctx context.Context, collectionID UniqueID) *datapb.CollectionInfo {
	coll := s.meta.GetCollection(collectionID)
	if coll != nil {
		return coll
	}
	err := s.loadCollectionFromRootCoord(ctx, collectionID)
	if err != nil {
		log.Warn("failed to load collection from RootCoord", zap.Int64("collectionID", collectionID), zap.Error(err))
	}

	return s.meta.GetCollection(collectionID)
}