server.go 21.6 KB
Newer Older
S
sunby 已提交
1 2 3
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
4 5 6 7 8 9 10
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11

12
package datacoord
S
sunby 已提交
13

S
sunby 已提交
14 15
import (
	"context"
S
sunby 已提交
16
	"errors"
17
	"fmt"
S
sunby 已提交
18
	"math/rand"
S
sunby 已提交
19
	"sync"
S
sunby 已提交
20 21 22
	"sync/atomic"
	"time"

S
sunby 已提交
23
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
24
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
25
	"github.com/milvus-io/milvus/internal/logutil"
26 27 28 29
	"github.com/milvus-io/milvus/internal/rootcoord"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
30
	"go.uber.org/zap"
S
sunby 已提交
31

X
Xiangyu Wang 已提交
32 33 34 35 36
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
37
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
39

X
Xiangyu Wang 已提交
40 41
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
42
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
44 45
)

S
sunby 已提交
46
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
47

48 49
var (
	// TODO: sunby put to config
50 51 52 53 54
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
55 56
)

S
sunby 已提交
57
type (
58 59 60
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
61
	Timestamp = typeutil.Timestamp
S
sunby 已提交
62
)
S
sunby 已提交
63

64
// ServerState type alias, presents datacoord Server State
65 66 67 68 69 70 71 72 73 74 75
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

76 77
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
78

79 80 81
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

82 83 84
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

85 86
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
87
type Server struct {
S
sunby 已提交
88 89 90 91
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
92
	isServing        ServerState
S
sunby 已提交
93
	helper           ServerHelper
94

95 96 97 98
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
99
	cluster         *Cluster
100
	channelManager  *ChannelManager
101
	rootCoordClient types.RootCoord
102

103 104
	metricsCacheManager *metricsinfo.MetricsCacheManager

105 106
	flushCh   chan UniqueID
	msFactory msgstream.Factory
107

108 109
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
110

111 112
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
113
}
S
sunby 已提交
114

115
// ServerHelper datacoord server injection helper
S
sunby 已提交
116 117 118 119 120 121 122 123 124 125
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

126
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
127 128
type Option func(svr *Server)

129
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
130
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
131 132 133 134 135
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

136
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
137 138 139 140 141 142
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

143 144 145 146 147 148 149
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

150
// SetDataNodeCreator returns an `Option` setting DataNode create function
151
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
152 153 154 155 156
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

157
// CreateServer create `Server` instance
S
sunby 已提交
158
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
159
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
160
	s := &Server{
S
sunby 已提交
161 162 163
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
164
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
165
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
166
		helper:                 defaultServerHelper(),
167 168

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
169
	}
S
sunby 已提交
170 171 172 173

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
174 175 176
	return s, nil
}

G
godchen 已提交
177 178
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
179 180
}

G
godchen 已提交
181 182
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
183 184
}

185 186
// Register register data service at etcd
func (s *Server) Register() error {
187
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
188 189 190
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
191
	s.session.Init(typeutil.DataCoordRole, Params.IP, true)
192
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
193
	Params.SetLogger(typeutil.UniqueID(-1))
194 195 196
	return nil
}

197
// Init change server state to Initializing
198
func (s *Server) Init() error {
199
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
200 201 202
	return nil
}

203 204 205 206 207 208 209
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
210
func (s *Server) Start() error {
211
	var err error
S
sunby 已提交
212 213 214 215 216 217 218 219
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
220
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
221 222
		return err
	}
223

S
sunby 已提交
224 225 226
	if err = s.initMeta(); err != nil {
		return err
	}
227

S
sunby 已提交
228 229 230
	if err = s.initCluster(); err != nil {
		return err
	}
231

C
congqixia 已提交
232
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
233

234
	s.startSegmentManager()
S
sunby 已提交
235 236 237
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
238

S
sunby 已提交
239
	s.startServerLoop()
240 241
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
242
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
243
	log.Debug("dataCoordinator startup success")
244

S
sunby 已提交
245
	return nil
246 247 248
}

func (s *Server) initCluster() error {
249 250 251 252
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
253
	var err error
254 255 256
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
257
	}
258 259 260
	sessionManager := NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(sessionManager, s.channelManager)
	return nil
261
}
G
groot 已提交
262

263 264 265
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
266
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
267 268
		return err
	}
269 270
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
271
	datanodes := make([]*NodeInfo, 0, len(sessions))
272
	for _, session := range sessions {
273 274 275
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
276
		}
277
		datanodes = append(datanodes, info)
278
	}
G
godchen 已提交
279

S
sunby 已提交
280
	s.cluster.Startup(datanodes)
281

282
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
283 284 285
	return nil
}

286
func (s *Server) startSegmentManager() {
287
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
288 289
}

S
sunby 已提交
290
func (s *Server) initMeta() error {
291
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
292
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
293 294 295
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
296 297

		s.kvClient = etcdKV
G
godchen 已提交
298
		s.meta, err = newMeta(s.kvClient)
299 300 301 302
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
303
	}
G
godchen 已提交
304
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
305 306
}

307 308
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
309
	s.serverLoopWg.Add(4)
310
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
311
	go s.startDataNodeTtLoop(s.serverLoopCtx)
312
	go s.startWatchService(s.serverLoopCtx)
S
sunby 已提交
313
	go s.startFlushLoop(s.serverLoopCtx)
314
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
315 316
		if err := s.Stop(); err != nil {
			log.Error("failed to stop server", zap.Error(err))
G
godchen 已提交
317
		}
318
	})
319 320 321
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
322
	defer logutil.LogPanic()
323
	defer s.serverLoopWg.Done()
G
groot 已提交
324
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
325
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
326
	log.Debug("dataCoord create stats channel consumer",
327
		zap.String("channelName", Params.StatisticsChannelName),
328
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
329 330 331 332 333
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
334
			log.Debug("stats channel shutdown")
335 336 337
			return
		default:
		}
338
		msgPack := statsStream.Consume()
S
sunby 已提交
339
		if msgPack == nil {
S
sunby 已提交
340
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
341
			return
S
sunby 已提交
342
		}
343
		for _, msg := range msgPack.Msgs {
344
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
345 346
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
347
				continue
S
sunby 已提交
348
			}
349 350
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
351
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
352
			}
353 354 355 356
		}
	}
}

S
sunby 已提交
357
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
358
	defer logutil.LogPanic()
359
	defer s.serverLoopWg.Done()
S
sunby 已提交
360 361 362 363 364
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
365 366
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
367 368 369
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
370 371
	ttMsgStream.Start()
	defer ttMsgStream.Close()
372 373 374 375 376

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
377
		defer checker.Stop()
378
	}
379 380 381
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
382
			log.Debug("data node tt loop shutdown")
383 384 385
			return
		default:
		}
S
sunby 已提交
386
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
387
		if msgPack == nil {
S
sunby 已提交
388
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
389
			return
S
sunby 已提交
390
		}
391
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
392
			if msg.Type() != commonpb.MsgType_DataNodeTt {
393
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
394
					zap.Stringer("msgType", msg.Type()))
395 396
				continue
			}
S
sunby 已提交
397
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
398 399 400
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
401 402 403

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
404 405
			if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
				log.Warn("failed to expire allocations", zap.Error(err))
G
godchen 已提交
406 407
				continue
			}
408 409 410 411 412
			physical, _ := tsoutil.ParseTS(ts)
			if time.Since(physical).Minutes() > 1 {
				// if lag behind, log every 1 mins about
				log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
			}
413
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
414
			if err != nil {
S
sunby 已提交
415
				log.Warn("get flushable segments failed", zap.Error(err))
416 417
				continue
			}
418

419 420 421 422 423
			staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
				return !info.lastFlushTime.IsZero() && time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
			})

			if len(segments)+len(staleSegments) == 0 {
S
sunby 已提交
424 425
				continue
			}
426
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
427
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
428
			for _, id := range segments {
S
sunby 已提交
429 430
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
431
					log.Error("get segment from meta error", zap.Int64("id", id),
432
						zap.Error(err))
S
sunby 已提交
433
					continue
434
				}
S
sunby 已提交
435
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
S
sunby 已提交
436
				s.meta.SetLastFlushTime(id, time.Now())
437
			}
438 439 440 441 442 443 444 445 446 447 448 449 450
			markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
			for _, segment := range staleSegments {
				for _, fSeg := range segmentInfos {
					// check segment needs flush first
					if segment.GetID() == fSeg.GetID() {
						continue
					}
				}
				markSegments = append(markSegments, segment.SegmentInfo)
				s.meta.SetLastFlushTime(segment.GetID(), time.Now())
			}
			if len(segmentInfos)+len(markSegments) > 0 {
				s.cluster.Flush(s.ctx, segmentInfos, markSegments)
S
sunby 已提交
451
			}
452
		}
S
sunby 已提交
453
		s.helper.eventAfterHandleDataNodeTt()
454 455 456
	}
}

457 458 459 460
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
461
func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
462
	defer logutil.LogPanic()
463 464 465 466 467 468
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
469 470 471 472 473
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
474 475 476 477 478 479 480 481
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
482 483 484
		}
	}
}
S
sunby 已提交
485

486
// handles session events - DataNodes Add/Del
487
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
488
	if event == nil {
489
		return nil
490 491 492 493 494 495
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
496 497 498 499
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
500 501 502 503 504
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
505 506 507 508
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
509 510 511 512 513
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
514 515 516 517
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
518 519 520 521 522
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
523
	return nil
524 525
}

S
sunby 已提交
526 527 528 529 530 531 532 533 534 535 536 537 538
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
539 540
			//Ignore return error
			_ = s.postFlush(ctx, segmentID)
S
sunby 已提交
541 542 543 544
		}
	}
}

545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
577 578 579 580 581 582 583 584 585 586 587
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

588
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
589
	var err error
G
godchen 已提交
590
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
591 592
		return err
	}
593
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
594 595
		return err
	}
596
	return s.rootCoordClient.Start()
S
sunby 已提交
597
}
598

599 600 601 602
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
603
func (s *Server) Stop() error {
604
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
605 606
		return nil
	}
607
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
608
	s.cluster.Close()
S
sunby 已提交
609
	s.stopServerLoop()
S
sunby 已提交
610 611 612
	return nil
}

S
sunby 已提交
613 614
// CleanMeta only for test
func (s *Server) CleanMeta() error {
615
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
616
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
617 618
}

S
sunby 已提交
619 620 621 622 623
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

624 625 626 627 628 629 630 631 632 633 634 635 636 637
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
638

639 640
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
641
		Base: &commonpb.MsgBase{
642
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
643 644 645 646 647 648 649 650
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
651
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
652 653
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
654 655
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
656
			SourceID:  Params.NodeID,
657
		},
S
sunby 已提交
658 659 660 661 662
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
663 664
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
665 666
		return err
	}
S
sunby 已提交
667
	collInfo := &datapb.CollectionInfo{
668 669 670 671
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
672
	}
S
sunby 已提交
673 674
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
675 676
}

S
sunby 已提交
677
// GetVChanPositions get vchannel latest postitions with provided dml channel names
678 679 680 681 682 683 684 685 686 687 688
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	var useUnflushedPosition bool
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
			flushed = append(flushed, s.SegmentInfo)
			if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
				seekPosition = s.DmlPosition
S
sunby 已提交
689
			}
690 691
			continue
		}
692

693 694 695
		if s.DmlPosition == nil {
			continue
		}
S
sunby 已提交
696

697
		unflushed = append(unflushed, s.SegmentInfo)
S
sunby 已提交
698

699 700 701 702 703 704
		if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
			useUnflushedPosition = true
			if !seekFromStartPosition {
				seekPosition = s.DmlPosition
			} else {
				seekPosition = s.StartPosition
S
sunby 已提交
705
			}
706
		}
707 708 709 710 711 712 713 714 715 716
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		coll := s.meta.GetCollection(collectionID)
		if coll != nil {
			for _, sp := range coll.GetStartPositions() {
				if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
					seekPosition = &internalpb.MsgPosition{
						ChannelName: channel,
						MsgID:       sp.GetData(),
717 718 719 720
					}
				}
			}
		}
721
	}
722

723 724 725 726 727 728
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
729
	}
730
}