server.go 20.9 KB
Newer Older
S
sunby 已提交
1 2 3
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
4 5 6 7 8 9 10
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11

12
package datacoord
S
sunby 已提交
13

S
sunby 已提交
14 15
import (
	"context"
S
sunby 已提交
16
	"errors"
17
	"fmt"
S
sunby 已提交
18
	"math/rand"
S
sunby 已提交
19
	"sync"
S
sunby 已提交
20 21 22
	"sync/atomic"
	"time"

S
sunby 已提交
23
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
24
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
25
	"github.com/milvus-io/milvus/internal/logutil"
26 27 28 29
	"github.com/milvus-io/milvus/internal/rootcoord"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
30
	"go.uber.org/zap"
S
sunby 已提交
31

X
Xiangyu Wang 已提交
32 33 34 35 36
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
37
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
38
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
39

X
Xiangyu Wang 已提交
40 41
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
42
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
44 45
)

S
sunby 已提交
46
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
47

48 49 50 51 52 53 54 55
var (
	// TODO: sunby put to config
	enableTtChecker  = true
	ttCheckerName    = "dataTtChecker"
	ttMaxInterval    = 3 * time.Minute
	ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
)

S
sunby 已提交
56
type (
57 58 59
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
60
	Timestamp = typeutil.Timestamp
S
sunby 已提交
61
)
S
sunby 已提交
62

63
// ServerState type alias, presents datacoord Server State
64 65 66 67 68 69 70 71 72 73 74
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

75 76
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
77

78 79 80
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

81 82 83
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

84 85
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
86
type Server struct {
S
sunby 已提交
87 88 89 90
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
91
	isServing        ServerState
S
sunby 已提交
92
	helper           ServerHelper
93

94 95 96 97
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
98
	cluster         *Cluster
99
	channelManager  *ChannelManager
100
	rootCoordClient types.RootCoord
101

102 103
	metricsCacheManager *metricsinfo.MetricsCacheManager

104 105
	flushCh   chan UniqueID
	msFactory msgstream.Factory
106

107 108
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
109

110 111
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
112
}
S
sunby 已提交
113

114
// ServerHelper datacoord server injection helper
S
sunby 已提交
115 116 117 118 119 120 121 122 123 124
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

125
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
126 127
type Option func(svr *Server)

128
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
129
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
130 131 132 133 134
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

135
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
136 137 138 139 140 141
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

142 143 144 145 146 147 148
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

149
// SetDataNodeCreator returns an `Option` setting DataNode create function
150
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
151 152 153 154 155
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

156
// CreateServer create `Server` instance
S
sunby 已提交
157
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
158
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
159
	s := &Server{
S
sunby 已提交
160 161 162
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
163
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
164
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
165
		helper:                 defaultServerHelper(),
166 167

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
168
	}
S
sunby 已提交
169 170 171 172

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
173 174 175
	return s, nil
}

G
godchen 已提交
176 177
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
178 179
}

G
godchen 已提交
180 181
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
182 183
}

184 185
// Register register data service at etcd
func (s *Server) Register() error {
186
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
187 188 189
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
190
	s.session.Init(typeutil.DataCoordRole, Params.IP, true)
191
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
192
	Params.SetLogger(typeutil.UniqueID(-1))
193 194 195
	return nil
}

196
// Init change server state to Initializing
197
func (s *Server) Init() error {
198
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
199 200 201
	return nil
}

202 203 204 205 206 207 208
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
209
func (s *Server) Start() error {
210
	var err error
S
sunby 已提交
211 212 213 214 215 216 217 218
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
219
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
220 221
		return err
	}
222

S
sunby 已提交
223 224 225
	if err = s.initMeta(); err != nil {
		return err
	}
226

S
sunby 已提交
227 228 229
	if err = s.initCluster(); err != nil {
		return err
	}
230

C
congqixia 已提交
231
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
232

233
	s.startSegmentManager()
S
sunby 已提交
234 235 236
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
237

S
sunby 已提交
238
	s.startServerLoop()
239 240
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
241
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
242
	log.Debug("dataCoordinator startup success")
243

S
sunby 已提交
244
	return nil
245 246 247
}

func (s *Server) initCluster() error {
248 249 250 251
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
252
	var err error
253 254 255
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
256
	}
257 258 259
	sessionManager := NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(sessionManager, s.channelManager)
	return nil
260
}
G
groot 已提交
261

262 263 264
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
265
		log.Debug("dataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
266 267
		return err
	}
268 269
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
270
	datanodes := make([]*NodeInfo, 0, len(sessions))
271
	for _, session := range sessions {
272 273 274
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
275
		}
276
		datanodes = append(datanodes, info)
277
	}
G
godchen 已提交
278

S
sunby 已提交
279
	s.cluster.Startup(datanodes)
280

281
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
282 283 284
	return nil
}

285
func (s *Server) startSegmentManager() {
286
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
287 288
}

S
sunby 已提交
289
func (s *Server) initMeta() error {
290
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
291
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
292 293 294
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
295 296

		s.kvClient = etcdKV
G
godchen 已提交
297
		s.meta, err = newMeta(s.kvClient)
298 299 300 301
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
302
	}
G
godchen 已提交
303
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
304 305
}

306 307
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
308
	s.serverLoopWg.Add(4)
309
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
310
	go s.startDataNodeTtLoop(s.serverLoopCtx)
311
	go s.startWatchService(s.serverLoopCtx)
S
sunby 已提交
312
	go s.startFlushLoop(s.serverLoopCtx)
313
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
314 315
		if err := s.Stop(); err != nil {
			log.Error("failed to stop server", zap.Error(err))
G
godchen 已提交
316
		}
317
	})
318 319 320
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
321
	defer logutil.LogPanic()
322
	defer s.serverLoopWg.Done()
G
groot 已提交
323
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
324
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
325
	log.Debug("dataCoord create stats channel consumer",
326
		zap.String("channelName", Params.StatisticsChannelName),
327
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
328 329 330 331 332
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
333
			log.Debug("stats channel shutdown")
334 335 336
			return
		default:
		}
337
		msgPack := statsStream.Consume()
S
sunby 已提交
338
		if msgPack == nil {
S
sunby 已提交
339
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
340
			return
S
sunby 已提交
341
		}
342
		for _, msg := range msgPack.Msgs {
343
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
344 345
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
346
				continue
S
sunby 已提交
347
			}
348 349
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
350
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
351
			}
352 353 354 355
		}
	}
}

S
sunby 已提交
356
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
357
	defer logutil.LogPanic()
358
	defer s.serverLoopWg.Done()
S
sunby 已提交
359 360 361 362 363
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
364 365
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
366 367 368
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
369 370
	ttMsgStream.Start()
	defer ttMsgStream.Close()
371 372 373 374 375

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
376
		defer checker.Stop()
377
	}
378 379 380
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
381
			log.Debug("data node tt loop shutdown")
382 383 384
			return
		default:
		}
S
sunby 已提交
385
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
386
		if msgPack == nil {
S
sunby 已提交
387
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
388
			return
S
sunby 已提交
389
		}
390
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
391
			if msg.Type() != commonpb.MsgType_DataNodeTt {
392
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
393
					zap.Stringer("msgType", msg.Type()))
394 395
				continue
			}
S
sunby 已提交
396
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
397 398 399
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
400 401 402

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
403 404
			if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
				log.Warn("failed to expire allocations", zap.Error(err))
G
godchen 已提交
405 406
				continue
			}
407 408 409 410 411
			physical, _ := tsoutil.ParseTS(ts)
			if time.Since(physical).Minutes() > 1 {
				// if lag behind, log every 1 mins about
				log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
			}
412
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
413
			if err != nil {
S
sunby 已提交
414
				log.Warn("get flushable segments failed", zap.Error(err))
415 416
				continue
			}
417

S
sunby 已提交
418 419 420
			if len(segments) == 0 {
				continue
			}
421
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments))
422
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
423
			for _, id := range segments {
S
sunby 已提交
424 425
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
426
					log.Error("get segment from meta error", zap.Int64("id", id),
427
						zap.Error(err))
S
sunby 已提交
428
					continue
429
				}
S
sunby 已提交
430
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
S
sunby 已提交
431
				s.meta.SetLastFlushTime(id, time.Now())
432
			}
S
sunby 已提交
433
			if len(segmentInfos) > 0 {
434
				s.cluster.Flush(s.ctx, segmentInfos)
S
sunby 已提交
435
			}
436
		}
S
sunby 已提交
437
		s.helper.eventAfterHandleDataNodeTt()
438 439 440
	}
}

441 442 443 444
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
445
func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
446
	defer logutil.LogPanic()
447 448 449 450 451 452
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
453 454 455 456 457
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
458 459 460 461 462 463 464 465
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
466 467 468
		}
	}
}
S
sunby 已提交
469

470
// handles session events - DataNodes Add/Del
471
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
472
	if event == nil {
473
		return nil
474 475 476 477 478 479
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
480 481 482 483
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
484 485 486 487 488
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
489 490 491 492
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
493 494 495 496 497
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
498 499 500 501
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
502 503 504 505 506
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
507
	return nil
508 509
}

S
sunby 已提交
510 511 512 513 514 515 516 517 518 519 520 521 522
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
523 524
			//Ignore return error
			_ = s.postFlush(ctx, segmentID)
S
sunby 已提交
525 526 527 528
		}
	}
}

529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
561 562 563 564 565 566 567 568 569 570 571
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

572
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
573
	var err error
G
godchen 已提交
574
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
575 576
		return err
	}
577
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
578 579
		return err
	}
580
	return s.rootCoordClient.Start()
S
sunby 已提交
581
}
582

583 584 585 586
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
587
func (s *Server) Stop() error {
588
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
589 590
		return nil
	}
591
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
592
	s.cluster.Close()
S
sunby 已提交
593
	s.stopServerLoop()
S
sunby 已提交
594 595 596
	return nil
}

S
sunby 已提交
597 598
// CleanMeta only for test
func (s *Server) CleanMeta() error {
599
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
600
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
601 602
}

S
sunby 已提交
603 604 605 606 607
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

608 609 610 611 612 613 614 615 616 617 618 619 620 621
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
622

623 624
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
625
		Base: &commonpb.MsgBase{
626
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
627 628 629 630 631 632 633 634
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
635
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
636 637
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
638 639
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
640
			SourceID:  Params.NodeID,
641
		},
S
sunby 已提交
642 643 644 645 646
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
647 648
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
649 650
		return err
	}
S
sunby 已提交
651
	collInfo := &datapb.CollectionInfo{
652 653 654 655
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
656
	}
S
sunby 已提交
657 658
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
659 660
}

S
sunby 已提交
661
// GetVChanPositions get vchannel latest postitions with provided dml channel names
662 663 664 665 666 667 668 669 670 671 672
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	var useUnflushedPosition bool
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
			flushed = append(flushed, s.SegmentInfo)
			if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
				seekPosition = s.DmlPosition
S
sunby 已提交
673
			}
674 675
			continue
		}
676

677 678 679
		if s.DmlPosition == nil {
			continue
		}
S
sunby 已提交
680

681
		unflushed = append(unflushed, s.SegmentInfo)
S
sunby 已提交
682

683 684 685 686 687 688
		if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
			useUnflushedPosition = true
			if !seekFromStartPosition {
				seekPosition = s.DmlPosition
			} else {
				seekPosition = s.StartPosition
S
sunby 已提交
689
			}
690
		}
691 692 693 694 695 696 697 698 699 700
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		coll := s.meta.GetCollection(collectionID)
		if coll != nil {
			for _, sp := range coll.GetStartPositions() {
				if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
					seekPosition = &internalpb.MsgPosition{
						ChannelName: channel,
						MsgID:       sp.GetData(),
701 702 703 704
					}
				}
			}
		}
705
	}
706

707 708 709 710 711 712
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
713
	}
714
}