server.go 20.5 KB
Newer Older
S
sunby 已提交
1 2 3
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
4 5 6 7 8 9 10
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11

12
package datacoord
S
sunby 已提交
13

S
sunby 已提交
14 15
import (
	"context"
S
sunby 已提交
16
	"errors"
17
	"fmt"
S
sunby 已提交
18
	"math/rand"
S
sunby 已提交
19
	"sync"
S
sunby 已提交
20 21 22
	"sync/atomic"
	"time"

23
	"github.com/milvus-io/milvus/internal/rootcoord"
24 25
	"github.com/milvus-io/milvus/internal/util/metricsinfo"

S
sunby 已提交
26
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
27
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
28
	"github.com/milvus-io/milvus/internal/logutil"
29
	"go.uber.org/zap"
S
sunby 已提交
30

X
Xiangyu Wang 已提交
31 32 33 34 35
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
36
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
37
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
38

X
Xiangyu Wang 已提交
39 40
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
41
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
43 44
)

S
sunby 已提交
45
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
46

47 48 49 50 51 52 53 54
var (
	// TODO: sunby put to config
	enableTtChecker  = true
	ttCheckerName    = "dataTtChecker"
	ttMaxInterval    = 3 * time.Minute
	ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
)

S
sunby 已提交
55
type (
56 57 58
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
59
	Timestamp = typeutil.Timestamp
S
sunby 已提交
60
)
S
sunby 已提交
61

62
// ServerState type alias, presents datacoord Server State
63 64 65 66 67 68 69 70 71 72 73
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

74 75
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
76

77 78 79
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

80 81 82
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

83 84
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
85
type Server struct {
S
sunby 已提交
86 87 88 89
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
90
	isServing        ServerState
S
sunby 已提交
91
	helper           ServerHelper
92

93 94 95 96
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
97
	cluster         *Cluster
98
	channelManager  *ChannelManager
99
	rootCoordClient types.RootCoord
100

101 102
	metricsCacheManager *metricsinfo.MetricsCacheManager

103 104
	flushCh   chan UniqueID
	msFactory msgstream.Factory
105

106 107
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
108

109 110
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
111
}
S
sunby 已提交
112

113
// ServerHelper datacoord server injection helper
S
sunby 已提交
114 115 116 117 118 119 120 121 122 123
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

124
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
125 126
type Option func(svr *Server)

127
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
128
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
129 130 131 132 133
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

134
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
135 136 137 138 139 140
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

141 142 143 144 145 146 147
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

148
// SetDataNodeCreator returns an `Option` setting DataNode create function
149
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
150 151 152 153 154
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

155
// CreateServer create `Server` instance
S
sunby 已提交
156
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
157
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
158
	s := &Server{
S
sunby 已提交
159 160 161
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
162
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
163
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
164
		helper:                 defaultServerHelper(),
165 166

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
167
	}
S
sunby 已提交
168 169 170 171

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
172 173 174
	return s, nil
}

G
godchen 已提交
175 176
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
177 178
}

G
godchen 已提交
179 180
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
181 182
}

183 184
// Register register data service at etcd
func (s *Server) Register() error {
185
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
186 187 188
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
189
	s.session.Init(typeutil.DataCoordRole, Params.IP, true)
190
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
191
	Params.SetLogger(typeutil.UniqueID(-1))
192 193 194
	return nil
}

195
// Init change server state to Initializing
196
func (s *Server) Init() error {
197
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
198 199 200
	return nil
}

201 202 203 204 205 206 207
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
208
func (s *Server) Start() error {
209
	var err error
S
sunby 已提交
210 211 212 213 214 215 216 217
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
218
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
219 220
		return err
	}
221

S
sunby 已提交
222 223 224
	if err = s.initMeta(); err != nil {
		return err
	}
225

S
sunby 已提交
226 227 228
	if err = s.initCluster(); err != nil {
		return err
	}
229

C
congqixia 已提交
230
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
231

232
	s.startSegmentManager()
S
sunby 已提交
233 234 235
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
236

S
sunby 已提交
237
	s.startServerLoop()
238 239
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
240
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
241
	log.Debug("dataCoordinator startup success")
242

S
sunby 已提交
243
	return nil
244 245 246
}

func (s *Server) initCluster() error {
247 248 249 250
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
251
	var err error
252 253 254
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
255
	}
256 257 258
	sessionManager := NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(sessionManager, s.channelManager)
	return nil
259
}
G
groot 已提交
260

261 262 263
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
264
		log.Debug("dataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
265 266
		return err
	}
267 268
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
269
	datanodes := make([]*NodeInfo, 0, len(sessions))
270
	for _, session := range sessions {
271 272 273
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
274
		}
275
		datanodes = append(datanodes, info)
276
	}
G
godchen 已提交
277

S
sunby 已提交
278
	s.cluster.Startup(datanodes)
279

280
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
281 282 283
	return nil
}

284
func (s *Server) startSegmentManager() {
285
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
286 287
}

S
sunby 已提交
288
func (s *Server) initMeta() error {
289
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
290
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
291 292 293
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
294 295

		s.kvClient = etcdKV
G
godchen 已提交
296
		s.meta, err = newMeta(s.kvClient)
297 298 299 300
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
301
	}
G
godchen 已提交
302
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
303 304
}

305 306
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
307
	s.serverLoopWg.Add(4)
308
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
309
	go s.startDataNodeTtLoop(s.serverLoopCtx)
310
	go s.startWatchService(s.serverLoopCtx)
S
sunby 已提交
311
	go s.startFlushLoop(s.serverLoopCtx)
312
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
313 314
		if err := s.Stop(); err != nil {
			log.Error("failed to stop server", zap.Error(err))
G
godchen 已提交
315
		}
316
	})
317 318 319
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
320
	defer logutil.LogPanic()
321
	defer s.serverLoopWg.Done()
G
groot 已提交
322
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
323
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
324
	log.Debug("dataCoord create stats channel consumer",
325
		zap.String("channelName", Params.StatisticsChannelName),
326
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
327 328 329 330 331
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
332
			log.Debug("stats channel shutdown")
333 334 335
			return
		default:
		}
336
		msgPack := statsStream.Consume()
S
sunby 已提交
337
		if msgPack == nil {
S
sunby 已提交
338
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
339
			return
S
sunby 已提交
340
		}
341
		for _, msg := range msgPack.Msgs {
342
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
343 344
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
345
				continue
S
sunby 已提交
346
			}
347 348
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
349
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
350
			}
351 352 353 354
		}
	}
}

S
sunby 已提交
355
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
356
	defer logutil.LogPanic()
357
	defer s.serverLoopWg.Done()
S
sunby 已提交
358 359 360 361 362 363
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
364
		Params.DataCoordSubscriptionName)
365 366 367
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
368 369
	ttMsgStream.Start()
	defer ttMsgStream.Close()
370 371 372 373 374

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
375
		defer checker.Stop()
376
	}
377 378 379
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
380
			log.Debug("data node tt loop shutdown")
381 382 383
			return
		default:
		}
S
sunby 已提交
384
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
385
		if msgPack == nil {
S
sunby 已提交
386
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
387
			return
S
sunby 已提交
388
		}
389
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
390
			if msg.Type() != commonpb.MsgType_DataNodeTt {
391
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
392
					zap.Stringer("msgType", msg.Type()))
393 394
				continue
			}
S
sunby 已提交
395
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
396 397 398
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
399 400 401

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
402 403
			if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
				log.Warn("failed to expire allocations", zap.Error(err))
G
godchen 已提交
404 405
				continue
			}
406
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
407
			if err != nil {
S
sunby 已提交
408
				log.Warn("get flushable segments failed", zap.Error(err))
409 410
				continue
			}
411

S
sunby 已提交
412 413 414
			if len(segments) == 0 {
				continue
			}
415
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments))
416
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
417
			for _, id := range segments {
S
sunby 已提交
418 419
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
420
					log.Error("get segment from meta error", zap.Int64("id", id),
421
						zap.Error(err))
S
sunby 已提交
422
					continue
423
				}
S
sunby 已提交
424
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
S
sunby 已提交
425
				s.meta.SetLastFlushTime(id, time.Now())
426
			}
S
sunby 已提交
427
			if len(segmentInfos) > 0 {
428
				s.cluster.Flush(s.ctx, segmentInfos)
S
sunby 已提交
429
			}
430
		}
S
sunby 已提交
431
		s.helper.eventAfterHandleDataNodeTt()
432 433 434
	}
}

435 436 437 438
//go:norace
// fix datarace in unittest
// startWatchService will only be invoked at start procedure
// otherwise, remove the annotation and add atomic protection
439
func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
440
	defer logutil.LogPanic()
441 442 443 444 445 446
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
447 448 449 450 451
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
452 453 454 455 456 457 458 459
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
460 461 462
		}
	}
}
S
sunby 已提交
463

464
// handles session events - DataNodes Add/Del
465
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
466
	if event == nil {
467
		return nil
468 469 470 471 472 473
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
474 475 476 477
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
478 479 480 481 482
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
483 484 485 486
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
487 488 489 490 491
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
492 493 494 495
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
496 497 498 499 500
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
501
	return nil
502 503
}

S
sunby 已提交
504 505 506 507 508 509 510 511 512 513 514 515 516
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
517 518
			//Ignore return error
			_ = s.postFlush(ctx, segmentID)
S
sunby 已提交
519 520 521 522
		}
	}
}

523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
555 556 557 558 559 560 561 562 563 564 565
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

566
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
567
	var err error
G
godchen 已提交
568
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
569 570
		return err
	}
571
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
572 573
		return err
	}
574
	return s.rootCoordClient.Start()
S
sunby 已提交
575
}
576

577 578 579 580
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
581
func (s *Server) Stop() error {
582
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
583 584
		return nil
	}
585
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
586
	s.cluster.Close()
S
sunby 已提交
587
	s.stopServerLoop()
S
sunby 已提交
588 589 590
	return nil
}

S
sunby 已提交
591 592
// CleanMeta only for test
func (s *Server) CleanMeta() error {
593
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
594
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
595 596
}

S
sunby 已提交
597 598 599 600 601
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

602 603 604 605 606 607 608 609 610 611 612 613 614 615
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
616

617 618
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
619
		Base: &commonpb.MsgBase{
620
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
621 622 623 624 625 626 627 628
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
629
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
630 631
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
632 633
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
634
			SourceID:  Params.NodeID,
635
		},
S
sunby 已提交
636 637 638 639 640
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
641 642
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
643 644
		return err
	}
S
sunby 已提交
645
	collInfo := &datapb.CollectionInfo{
646 647 648 649
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
650
	}
S
sunby 已提交
651 652
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
653 654
}

S
sunby 已提交
655
// GetVChanPositions get vchannel latest postitions with provided dml channel names
656 657 658 659 660 661 662 663 664 665 666
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	var useUnflushedPosition bool
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
			flushed = append(flushed, s.SegmentInfo)
			if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
				seekPosition = s.DmlPosition
S
sunby 已提交
667
			}
668 669
			continue
		}
670

671 672 673
		if s.DmlPosition == nil {
			continue
		}
S
sunby 已提交
674

675
		unflushed = append(unflushed, s.SegmentInfo)
S
sunby 已提交
676

677 678 679 680 681 682
		if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
			useUnflushedPosition = true
			if !seekFromStartPosition {
				seekPosition = s.DmlPosition
			} else {
				seekPosition = s.StartPosition
S
sunby 已提交
683
			}
684
		}
685 686 687 688 689 690 691 692 693 694
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		coll := s.meta.GetCollection(collectionID)
		if coll != nil {
			for _, sp := range coll.GetStartPositions() {
				if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
					seekPosition = &internalpb.MsgPosition{
						ChannelName: channel,
						MsgID:       sp.GetData(),
695 696 697 698
					}
				}
			}
		}
699
	}
700

701 702 703 704 705 706
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
707
	}
708
}