server.go 19.6 KB
Newer Older
S
sunby 已提交
1 2 3
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
4 5 6 7 8 9 10
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
11

12
package datacoord
S
sunby 已提交
13

S
sunby 已提交
14 15
import (
	"context"
S
sunby 已提交
16
	"errors"
17
	"fmt"
S
sunby 已提交
18
	"math/rand"
S
sunby 已提交
19
	"sync"
S
sunby 已提交
20 21 22
	"sync/atomic"
	"time"

23 24
	"github.com/milvus-io/milvus/internal/util/metricsinfo"

S
sunby 已提交
25
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
26
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
27
	"github.com/milvus-io/milvus/internal/logutil"
28
	"go.uber.org/zap"
S
sunby 已提交
29

X
Xiangyu Wang 已提交
30 31 32 33 34
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
35
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
36
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
37

X
Xiangyu Wang 已提交
38 39
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
40
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
41
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
42 43
)

S
sunby 已提交
44
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
45

46 47 48 49 50 51 52 53
var (
	// TODO: sunby put to config
	enableTtChecker  = true
	ttCheckerName    = "dataTtChecker"
	ttMaxInterval    = 3 * time.Minute
	ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
)

S
sunby 已提交
54
type (
55 56 57
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
58
	Timestamp = typeutil.Timestamp
S
sunby 已提交
59
)
S
sunby 已提交
60

61
// ServerState type alias, presents datacoord Server State
62 63 64 65 66 67 68 69 70 71 72
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

73 74 75 76 77
// DataNodeCreatorFunc creator function for datanode
type DataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)

// RootCoordCreatorFunc creator function for rootcoord
type RootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
78

79 80 81
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

82 83
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
84
type Server struct {
S
sunby 已提交
85 86 87 88
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
89
	isServing        ServerState
S
sunby 已提交
90
	helper           ServerHelper
91

92 93 94 95
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
96
	cluster         *Cluster
97
	rootCoordClient types.RootCoord
98

99 100
	metricsCacheManager *metricsinfo.MetricsCacheManager

101 102
	flushCh   chan UniqueID
	msFactory msgstream.Factory
103

104 105 106
	session *sessionutil.Session
	liveCh  <-chan bool
	eventCh <-chan *sessionutil.SessionEvent
107

108 109
	dataNodeCreator        DataNodeCreatorFunc
	rootCoordClientCreator RootCoordCreatorFunc
N
neza2017 已提交
110
}
S
sunby 已提交
111

112
// ServerHelper datacoord server injection helper
S
sunby 已提交
113 114 115 116 117 118 119 120 121 122
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

123
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
124 125
type Option func(svr *Server)

126
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
127
func SetRootCoordCreator(creator RootCoordCreatorFunc) Option {
S
sunby 已提交
128 129 130 131 132
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

133
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
134 135 136 137 138 139
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

140 141 142 143 144 145 146
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

147 148 149 150 151 152 153
// SetDataNodeCreator returns an `Option` setting DataNode create function
func SetDataNodeCreator(creator DataNodeCreatorFunc) Option {
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

154
// CreateServer create `Server` instance
S
sunby 已提交
155
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
156
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
157
	s := &Server{
S
sunby 已提交
158 159 160
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
161
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
162
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
163
		helper:                 defaultServerHelper(),
164 165

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
166
	}
S
sunby 已提交
167 168 169 170

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
171 172 173
	return s, nil
}

174
// defaultDataNodeCreatorFunc defines the default behavior to get a DataNode
G
godchen 已提交
175 176
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
177 178
}

G
godchen 已提交
179 180
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
181 182
}

183 184
// Register register data service at etcd
func (s *Server) Register() error {
185
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
186 187 188
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
189
	s.liveCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
190
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
191
	Params.SetLogger(typeutil.UniqueID(-1))
192 193 194
	return nil
}

195
// Init change server state to Initializing
196
func (s *Server) Init() error {
197
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
198 199 200
	return nil
}

201 202 203 204 205 206 207
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
208
func (s *Server) Start() error {
209
	var err error
S
sunby 已提交
210 211 212 213 214 215 216 217
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
218
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
219 220
		return err
	}
221

S
sunby 已提交
222 223 224
	if err = s.initMeta(); err != nil {
		return err
	}
225

S
sunby 已提交
226 227 228
	if err = s.initCluster(); err != nil {
		return err
	}
229

C
congqixia 已提交
230
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
231

232
	s.startSegmentManager()
S
sunby 已提交
233 234 235
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
236

S
sunby 已提交
237
	s.startServerLoop()
238 239
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
240
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
241
	log.Debug("dataCoordinator startup success")
242

S
sunby 已提交
243
	return nil
244 245 246
}

func (s *Server) initCluster() error {
S
sunby 已提交
247
	var err error
248 249 250 251 252
	// cluster could be set by options
	// by-pass default NewCluster process if already set
	if s.cluster == nil {
		s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
	}
S
sunby 已提交
253
	return err
254
}
G
groot 已提交
255

256 257 258
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
259
		log.Debug("dataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
260 261
		return err
	}
262 263
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
264
	datanodes := make([]*NodeInfo, 0, len(sessions))
265
	for _, session := range sessions {
S
sunby 已提交
266
		info := &datapb.DataNodeInfo{
267 268 269
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
270 271 272
		}
		nodeInfo := NewNodeInfo(s.ctx, info)
		datanodes = append(datanodes, nodeInfo)
273
	}
G
godchen 已提交
274

S
sunby 已提交
275
	s.cluster.Startup(datanodes)
276

277
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
278 279 280
	return nil
}

281
func (s *Server) startSegmentManager() {
282
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
283 284
}

S
sunby 已提交
285
func (s *Server) initMeta() error {
286
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
287
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
288 289 290
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
291 292

		s.kvClient = etcdKV
G
godchen 已提交
293
		s.meta, err = newMeta(s.kvClient)
294 295 296 297
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
298
	}
G
godchen 已提交
299
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
300 301
}

302 303
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
304
	s.serverLoopWg.Add(4)
305
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
306
	go s.startDataNodeTtLoop(s.serverLoopCtx)
307
	go s.startWatchService(s.serverLoopCtx)
S
sunby 已提交
308
	go s.startFlushLoop(s.serverLoopCtx)
309
	go s.session.LivenessCheck(s.serverLoopCtx, s.liveCh, func() {
G
godchen 已提交
310 311 312 313
		err := s.Stop()
		if err != nil {
			log.Error("server stop fail", zap.Error(err))
		}
314
	})
315 316 317
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
318
	defer logutil.LogPanic()
319
	defer s.serverLoopWg.Done()
G
groot 已提交
320
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
321
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
322
	log.Debug("dataCoord create stats channel consumer",
323
		zap.String("channelName", Params.StatisticsChannelName),
324
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
325 326 327 328 329
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
330
			log.Debug("stats channel shutdown")
331 332 333
			return
		default:
		}
334
		msgPack := statsStream.Consume()
S
sunby 已提交
335
		if msgPack == nil {
S
sunby 已提交
336
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
337
			return
S
sunby 已提交
338
		}
339
		for _, msg := range msgPack.Msgs {
340
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
341 342
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
343
				continue
S
sunby 已提交
344
			}
345 346
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
347
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
348
			}
349 350 351 352
		}
	}
}

S
sunby 已提交
353
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
354
	defer logutil.LogPanic()
355
	defer s.serverLoopWg.Done()
S
sunby 已提交
356 357 358 359 360 361
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
362
		Params.DataCoordSubscriptionName)
363 364 365
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
366 367
	ttMsgStream.Start()
	defer ttMsgStream.Close()
368 369 370 371 372

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
373
		defer checker.Stop()
374
	}
375 376 377
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
378
			log.Debug("data node tt loop shutdown")
379 380 381
			return
		default:
		}
S
sunby 已提交
382
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
383
		if msgPack == nil {
S
sunby 已提交
384
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
385
			return
S
sunby 已提交
386
		}
387
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
388
			if msg.Type() != commonpb.MsgType_DataNodeTt {
389
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
390
					zap.Stringer("msgType", msg.Type()))
391 392
				continue
			}
S
sunby 已提交
393
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
394 395 396
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
397 398 399

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
G
godchen 已提交
400 401 402 403 404
			err = s.segmentManager.ExpireAllocations(ch, ts)
			if err != nil {
				log.Warn("expire allocations failed", zap.Error(err))
				continue
			}
405
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
406
			if err != nil {
S
sunby 已提交
407
				log.Warn("get flushable segments failed", zap.Error(err))
408 409
				continue
			}
410

S
sunby 已提交
411 412 413
			if len(segments) == 0 {
				continue
			}
414
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments))
415
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
416
			for _, id := range segments {
S
sunby 已提交
417 418
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
419
					log.Error("get segment from meta error", zap.Int64("id", id),
420
						zap.Error(err))
S
sunby 已提交
421
					continue
422
				}
S
sunby 已提交
423
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
S
sunby 已提交
424
				s.meta.SetLastFlushTime(id, time.Now())
425
			}
S
sunby 已提交
426
			if len(segmentInfos) > 0 {
S
sunby 已提交
427
				s.cluster.Flush(segmentInfos)
S
sunby 已提交
428
			}
429
		}
S
sunby 已提交
430
		s.helper.eventAfterHandleDataNodeTt()
431 432 433 434
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
435
	defer logutil.LogPanic()
436 437 438 439 440 441
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
442
		case event := <-s.eventCh:
443
			s.handleSessionEvent(ctx, event)
444 445 446
		}
	}
}
S
sunby 已提交
447

448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477
// handles session events - DataNodes Add/Del
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) {
	if event == nil {
		return
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
	node := NewNodeInfo(ctx, info)
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
		s.cluster.Register(node)
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
		s.cluster.UnRegister(node)
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
}

S
sunby 已提交
478 479 480 481 482 483 484 485 486 487 488 489 490
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
491 492
			//Ignore return error
			_ = s.postFlush(ctx, segmentID)
S
sunby 已提交
493 494 495 496
		}
	}
}

497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
529 530 531 532 533 534 535 536 537 538 539
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

540
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
541
	var err error
G
godchen 已提交
542
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
543 544
		return err
	}
545
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
546 547
		return err
	}
548
	return s.rootCoordClient.Start()
S
sunby 已提交
549
}
550

551 552 553 554
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
555
func (s *Server) Stop() error {
556
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
557 558
		return nil
	}
559
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
560
	s.cluster.Close()
S
sunby 已提交
561
	s.stopServerLoop()
S
sunby 已提交
562 563 564
	return nil
}

S
sunby 已提交
565 566
// CleanMeta only for test
func (s *Server) CleanMeta() error {
567
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
568
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
569 570
}

S
sunby 已提交
571 572 573 574 575
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

576 577 578 579 580 581 582 583 584 585 586 587 588 589
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
590

591 592
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
593
		Base: &commonpb.MsgBase{
594
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
595 596 597 598 599 600 601 602
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
603
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
604 605
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
606 607
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
608
			SourceID:  Params.NodeID,
609
		},
S
sunby 已提交
610 611 612 613 614
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
615 616
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
617 618
		return err
	}
S
sunby 已提交
619
	collInfo := &datapb.CollectionInfo{
620 621 622 623
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
624
	}
S
sunby 已提交
625 626
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
627 628
}

S
sunby 已提交
629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649
// GetVChanPositions get vchannel latest postitions with provided dml channel names
func (s *Server) GetVChanPositions(vchans []vchannel, seekFromStartPosition bool) ([]*datapb.VchannelInfo, error) {
	if s.kvClient == nil {
		return nil, errNilKvClient
	}
	pairs := make([]*datapb.VchannelInfo, 0, len(vchans))

	for _, vchan := range vchans {
		segments := s.meta.GetSegmentsByChannel(vchan.DmlChannel)
		flushedSegmentIDs := make([]UniqueID, 0)
		unflushed := make([]*datapb.SegmentInfo, 0)
		var seekPosition *internalpb.MsgPosition
		var useUnflushedPosition bool
		for _, s := range segments {
			if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
				flushedSegmentIDs = append(flushedSegmentIDs, s.ID)
				if seekPosition == nil || (!useUnflushedPosition && s.DmlPosition.Timestamp > seekPosition.Timestamp) {
					seekPosition = s.DmlPosition
				}
				continue
			}
650

S
sunby 已提交
651 652 653 654 655 656 657 658 659 660 661 662 663 664
			if s.DmlPosition == nil {
				continue
			}

			unflushed = append(unflushed, s.SegmentInfo)

			if seekPosition == nil || !useUnflushedPosition || s.DmlPosition.Timestamp < seekPosition.Timestamp {
				useUnflushedPosition = true
				if !seekFromStartPosition {
					seekPosition = s.DmlPosition
				} else {
					seekPosition = s.StartPosition
				}
			}
665 666
		}

S
sunby 已提交
667 668 669 670 671 672 673 674 675
		pairs = append(pairs, &datapb.VchannelInfo{
			CollectionID:      vchan.CollectionID,
			ChannelName:       vchan.DmlChannel,
			SeekPosition:      seekPosition,
			UnflushedSegments: unflushed,
			FlushedSegments:   flushedSegmentIDs,
		})
	}
	return pairs, nil
676
}