server.go 22.4 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25 26 27
	"sync/atomic"
	"time"

S
sunby 已提交
28
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
29
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
30
	"github.com/milvus-io/milvus/internal/logutil"
31 32 33 34
	"github.com/milvus-io/milvus/internal/rootcoord"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
35
	"go.uber.org/zap"
S
sunby 已提交
36

X
Xiangyu Wang 已提交
37 38 39 40 41
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
42
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
44

X
Xiangyu Wang 已提交
45 46
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
47
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
48
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
49 50
)

S
sunby 已提交
51
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
52

53 54
var (
	// TODO: sunby put to config
55 56 57 58 59
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
60 61
)

S
sunby 已提交
62
type (
63 64 65
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
66
	Timestamp = typeutil.Timestamp
S
sunby 已提交
67
)
S
sunby 已提交
68

69
// ServerState type alias, presents datacoord Server State
70 71 72 73 74 75 76 77 78 79 80
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

81 82
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
83

84 85 86
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

87 88 89
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

90 91
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
92
type Server struct {
S
sunby 已提交
93 94 95 96
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
97
	isServing        ServerState
S
sunby 已提交
98
	helper           ServerHelper
99

100 101 102 103
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
104
	cluster         *Cluster
105
	channelManager  *ChannelManager
106
	rootCoordClient types.RootCoord
107

108 109
	metricsCacheManager *metricsinfo.MetricsCacheManager

110 111
	flushCh   chan UniqueID
	msFactory msgstream.Factory
112

113 114
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
115

116 117
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
118
}
S
sunby 已提交
119

120
// ServerHelper datacoord server injection helper
S
sunby 已提交
121 122 123 124 125 126 127 128 129 130
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

131
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
132 133
type Option func(svr *Server)

134
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
135
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
136 137 138 139 140
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

141
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
142 143 144 145 146 147
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

148 149 150 151 152 153 154
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

155
// SetDataNodeCreator returns an `Option` setting DataNode create function
156
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
157 158 159 160 161
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

162
// CreateServer create `Server` instance
S
sunby 已提交
163
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
164
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
165
	s := &Server{
S
sunby 已提交
166 167 168
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
169
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
170
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
171
		helper:                 defaultServerHelper(),
172 173

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
174
	}
S
sunby 已提交
175 176 177 178

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
179 180 181
	return s, nil
}

G
godchen 已提交
182 183
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
184 185
}

G
godchen 已提交
186 187
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
188 189
}

190 191
// Register register data service at etcd
func (s *Server) Register() error {
192
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
193 194 195
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
196
	s.session.Init(typeutil.DataCoordRole, Params.IP, true)
197
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
198
	Params.SetLogger(typeutil.UniqueID(-1))
199 200 201
	return nil
}

202
// Init change server state to Initializing
203
func (s *Server) Init() error {
204
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
205 206 207
	return nil
}

208 209 210 211 212 213 214
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
215
func (s *Server) Start() error {
216
	var err error
S
sunby 已提交
217 218 219 220 221 222 223 224
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
225
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
226 227
		return err
	}
228

S
sunby 已提交
229 230 231
	if err = s.initMeta(); err != nil {
		return err
	}
232

S
sunby 已提交
233 234 235
	if err = s.initCluster(); err != nil {
		return err
	}
236

C
congqixia 已提交
237
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
238

239
	s.startSegmentManager()
S
sunby 已提交
240 241 242
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
243

S
sunby 已提交
244
	s.startServerLoop()
245 246
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
247
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
248
	log.Debug("dataCoordinator startup success")
249

S
sunby 已提交
250
	return nil
251 252 253
}

func (s *Server) initCluster() error {
254 255 256 257
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
258
	var err error
259 260 261
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
262
	}
263 264 265
	sessionManager := NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(sessionManager, s.channelManager)
	return nil
266
}
G
groot 已提交
267

268 269 270
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
271
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
272 273
		return err
	}
274 275
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
276
	datanodes := make([]*NodeInfo, 0, len(sessions))
277
	for _, session := range sessions {
278 279 280
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
281
		}
282
		datanodes = append(datanodes, info)
283
	}
G
godchen 已提交
284

S
sunby 已提交
285
	s.cluster.Startup(datanodes)
286

287
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
288 289 290
	return nil
}

291
func (s *Server) startSegmentManager() {
292
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
293 294
}

S
sunby 已提交
295
func (s *Server) initMeta() error {
296
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
297
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
298 299 300
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
301 302

		s.kvClient = etcdKV
G
godchen 已提交
303
		s.meta, err = newMeta(s.kvClient)
304 305 306 307
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
308
	}
G
godchen 已提交
309
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
310 311
}

312 313
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
314
	s.serverLoopWg.Add(4)
315
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
316
	go s.startDataNodeTtLoop(s.serverLoopCtx)
317
	go s.startWatchService(s.serverLoopCtx)
S
sunby 已提交
318
	go s.startFlushLoop(s.serverLoopCtx)
319
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
X
Xiaofan 已提交
320
		log.Fatal("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
321
	})
322 323 324
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
325
	defer logutil.LogPanic()
326
	defer s.serverLoopWg.Done()
G
groot 已提交
327
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
328
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
329
	log.Debug("dataCoord create stats channel consumer",
330
		zap.String("channelName", Params.StatisticsChannelName),
331
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
332 333 334 335 336
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
337
			log.Debug("stats channel shutdown")
338 339 340
			return
		default:
		}
341
		msgPack := statsStream.Consume()
S
sunby 已提交
342
		if msgPack == nil {
S
sunby 已提交
343
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
344
			return
S
sunby 已提交
345
		}
346
		for _, msg := range msgPack.Msgs {
347
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
348 349
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
350
				continue
S
sunby 已提交
351
			}
352 353
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
354
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
355
			}
356 357 358 359
		}
	}
}

S
sunby 已提交
360
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
361
	defer logutil.LogPanic()
362
	defer s.serverLoopWg.Done()
S
sunby 已提交
363 364 365 366 367
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
368 369
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
370 371 372
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
373 374
	ttMsgStream.Start()
	defer ttMsgStream.Close()
375 376 377 378 379

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
380
		defer checker.Stop()
381
	}
382 383 384
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
385
			log.Debug("data node tt loop shutdown")
386 387 388
			return
		default:
		}
S
sunby 已提交
389
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
390
		if msgPack == nil {
S
sunby 已提交
391
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
392
			return
S
sunby 已提交
393
		}
394
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
395
			if msg.Type() != commonpb.MsgType_DataNodeTt {
396
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
397
					zap.Stringer("msgType", msg.Type()))
398 399
				continue
			}
S
sunby 已提交
400
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
401 402 403
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
404 405 406

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
407 408
			if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
				log.Warn("failed to expire allocations", zap.Error(err))
G
godchen 已提交
409 410
				continue
			}
411 412 413 414 415
			physical, _ := tsoutil.ParseTS(ts)
			if time.Since(physical).Minutes() > 1 {
				// if lag behind, log every 1 mins about
				log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
			}
416
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
417
			if err != nil {
S
sunby 已提交
418
				log.Warn("get flushable segments failed", zap.Error(err))
419 420
				continue
			}
421

422 423 424 425 426
			staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
				return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
			})

			if len(segments)+len(staleSegments) == 0 {
S
sunby 已提交
427 428
				continue
			}
429
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
430
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
431
			for _, id := range segments {
S
sunby 已提交
432 433
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
434
					log.Error("get segment from meta error", zap.Int64("id", id),
435
						zap.Error(err))
S
sunby 已提交
436
					continue
437
				}
S
sunby 已提交
438
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
S
sunby 已提交
439
				s.meta.SetLastFlushTime(id, time.Now())
440
			}
441 442 443 444 445 446 447 448 449 450 451 452 453
			markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
			for _, segment := range staleSegments {
				for _, fSeg := range segmentInfos {
					// check segment needs flush first
					if segment.GetID() == fSeg.GetID() {
						continue
					}
				}
				markSegments = append(markSegments, segment.SegmentInfo)
				s.meta.SetLastFlushTime(segment.GetID(), time.Now())
			}
			if len(segmentInfos)+len(markSegments) > 0 {
				s.cluster.Flush(s.ctx, segmentInfos, markSegments)
S
sunby 已提交
454
			}
455
		}
S
sunby 已提交
456
		s.helper.eventAfterHandleDataNodeTt()
457 458 459
	}
}

460 461 462 463
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
464
func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
465
	defer logutil.LogPanic()
466 467 468 469 470 471
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
472 473 474 475 476
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
477 478 479 480 481 482 483 484
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
485 486 487
		}
	}
}
S
sunby 已提交
488

489
// handles session events - DataNodes Add/Del
490
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
491
	if event == nil {
492
		return nil
493 494 495 496 497 498
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
499 500 501 502
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
503 504 505 506 507
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
508 509 510 511
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
512 513 514 515 516
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
517 518 519 520
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
521 522 523 524 525
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
526
	return nil
527 528
}

S
sunby 已提交
529 530 531 532 533 534 535 536 537 538 539 540 541
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
542 543
			//Ignore return error
			_ = s.postFlush(ctx, segmentID)
S
sunby 已提交
544 545 546 547
		}
	}
}

548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
580 581 582 583 584 585 586 587 588 589 590
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

591
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
592
	var err error
G
godchen 已提交
593
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
594 595
		return err
	}
596
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
597 598
		return err
	}
599
	return s.rootCoordClient.Start()
S
sunby 已提交
600
}
601

602 603 604 605
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
606
func (s *Server) Stop() error {
607
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
608 609
		return nil
	}
610
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
611
	s.cluster.Close()
S
sunby 已提交
612
	s.stopServerLoop()
S
sunby 已提交
613 614 615
	return nil
}

S
sunby 已提交
616 617
// CleanMeta only for test
func (s *Server) CleanMeta() error {
618
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
619
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
620 621
}

S
sunby 已提交
622 623 624 625 626
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

627 628 629 630 631 632 633 634 635 636 637 638 639 640
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
641

642 643
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
644
		Base: &commonpb.MsgBase{
645
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
646 647 648 649 650 651 652 653
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
654
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
655 656
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
657 658
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
659
			SourceID:  Params.NodeID,
660
		},
S
sunby 已提交
661 662 663 664 665
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
666 667
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
668 669
		return err
	}
S
sunby 已提交
670
	collInfo := &datapb.CollectionInfo{
671 672 673 674
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
675
	}
S
sunby 已提交
676 677
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
678 679
}

S
sunby 已提交
680
// GetVChanPositions get vchannel latest postitions with provided dml channel names
681 682 683 684 685 686 687 688
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	var useUnflushedPosition bool
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
689
			flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
690 691
			if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
				seekPosition = s.DmlPosition
S
sunby 已提交
692
			}
693 694
			continue
		}
695

696 697 698
		if s.DmlPosition == nil {
			continue
		}
S
sunby 已提交
699

700
		unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo))
S
sunby 已提交
701

702 703 704 705 706 707
		if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
			useUnflushedPosition = true
			if !seekFromStartPosition {
				seekPosition = s.DmlPosition
			} else {
				seekPosition = s.StartPosition
S
sunby 已提交
708
			}
709
		}
710 711 712 713 714 715 716 717 718 719
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		coll := s.meta.GetCollection(collectionID)
		if coll != nil {
			for _, sp := range coll.GetStartPositions() {
				if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
					seekPosition = &internalpb.MsgPosition{
						ChannelName: channel,
						MsgID:       sp.GetData(),
720 721 722 723
					}
				}
			}
		}
724
	}
725

726 727 728 729 730 731
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
732
	}
733
}
734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}