server.go 23.6 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
S
sunby 已提交
21
	"errors"
22
	"fmt"
S
sunby 已提交
23
	"math/rand"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25 26 27
	"sync/atomic"
	"time"

S
sunby 已提交
28
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
29
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
30
	"github.com/milvus-io/milvus/internal/logutil"
31 32 33 34
	"github.com/milvus-io/milvus/internal/rootcoord"
	"github.com/milvus-io/milvus/internal/util/metricsinfo"
	"github.com/milvus-io/milvus/internal/util/mqclient"
	"github.com/milvus-io/milvus/internal/util/tsoutil"
35
	"go.uber.org/zap"
S
sunby 已提交
36

X
Xiangyu Wang 已提交
37 38 39 40 41
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
42
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
43
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
44

X
Xiangyu Wang 已提交
45 46
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
S
sunby 已提交
47
	"github.com/milvus-io/milvus/internal/proto/internalpb"
X
Xiangyu Wang 已提交
48
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
49 50
)

S
sunby 已提交
51
const connEtcdMaxRetryTime = 100000
S
sunby 已提交
52

53 54
var (
	// TODO: sunby put to config
55 56 57 58 59
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
	ttMaxInterval             = 3 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
	segmentTimedFlushDuration = 10.0
60 61
)

S
sunby 已提交
62
type (
63 64 65
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
66
	Timestamp = typeutil.Timestamp
S
sunby 已提交
67
)
S
sunby 已提交
68

69
// ServerState type alias, presents datacoord Server State
70 71 72 73 74 75 76 77 78 79 80
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

81 82
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
83

84 85 86
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

87 88 89
// makes sure Server implements `positionProvider`
var _ positionProvider = (*Server)(nil)

90 91
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
92
type Server struct {
S
sunby 已提交
93 94 95 96
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
97
	isServing        ServerState
S
sunby 已提交
98
	helper           ServerHelper
99

100 101 102 103
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
104
	cluster         *Cluster
S
sunby 已提交
105
	sessionManager  *SessionManager
106
	channelManager  *ChannelManager
107
	rootCoordClient types.RootCoord
108

S
sunby 已提交
109 110 111
	compactionTrigger trigger
	compactionHandler compactionPlanContext

112 113
	metricsCacheManager *metricsinfo.MetricsCacheManager

114 115
	flushCh   chan UniqueID
	msFactory msgstream.Factory
116

117 118
	session *sessionutil.Session
	eventCh <-chan *sessionutil.SessionEvent
119

120 121
	dataNodeCreator        dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
122
}
S
sunby 已提交
123

124
// ServerHelper datacoord server injection helper
S
sunby 已提交
125 126 127 128 129 130 131 132 133 134
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

135
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
136 137
type Option func(svr *Server)

138
// SetRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
139
func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
140 141 142 143 144
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

145
// SetServerHelper returns an `Option` setting ServerHelp with provided parameter
S
sunby 已提交
146 147 148 149 150 151
func SetServerHelper(helper ServerHelper) Option {
	return func(svr *Server) {
		svr.helper = helper
	}
}

152 153 154 155 156 157 158
// SetCluster returns an `Option` setting Cluster with provided parameter
func SetCluster(cluster *Cluster) Option {
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

159
// SetDataNodeCreator returns an `Option` setting DataNode create function
160
func SetDataNodeCreator(creator dataNodeCreatorFunc) Option {
161 162 163 164 165
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

166
// CreateServer create `Server` instance
S
sunby 已提交
167
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
168
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
169
	s := &Server{
S
sunby 已提交
170 171 172
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
173
		dataNodeCreator:        defaultDataNodeCreatorFunc,
S
sunby 已提交
174
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
175
		helper:                 defaultServerHelper(),
176 177

		metricsCacheManager: metricsinfo.NewMetricsCacheManager(),
S
sunby 已提交
178
	}
S
sunby 已提交
179 180 181 182

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
183 184 185
	return s, nil
}

G
godchen 已提交
186 187
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
188 189
}

G
godchen 已提交
190 191
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
192 193
}

194 195
// Register register data service at etcd
func (s *Server) Register() error {
196
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
197 198 199
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
200
	s.session.Init(typeutil.DataCoordRole, Params.IP, true)
201
	Params.NodeID = s.session.ServerID
X
Xiaofan 已提交
202
	Params.SetLogger(typeutil.UniqueID(-1))
203 204 205
	return nil
}

206
// Init change server state to Initializing
207
func (s *Server) Init() error {
208
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
209 210 211
	return nil
}

212 213 214 215 216 217 218
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
219
func (s *Server) Start() error {
220
	var err error
S
sunby 已提交
221 222 223 224 225 226 227 228
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
229
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
230 231
		return err
	}
232

S
sunby 已提交
233 234 235
	if err = s.initMeta(); err != nil {
		return err
	}
236

S
sunby 已提交
237 238 239
	if err = s.initCluster(); err != nil {
		return err
	}
240

C
congqixia 已提交
241
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
S
sunby 已提交
242 243 244 245
	if Params.EnableCompaction {
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
246

247
	s.startSegmentManager()
S
sunby 已提交
248 249 250
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
251

S
sunby 已提交
252
	s.startServerLoop()
253 254
	Params.CreatedTime = time.Now()
	Params.UpdatedTime = time.Now()
255
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
256
	log.Debug("dataCoordinator startup success")
257

S
sunby 已提交
258
	return nil
259 260 261
}

func (s *Server) initCluster() error {
262 263 264 265
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
266
	var err error
267 268 269
	s.channelManager, err = NewChannelManager(s.kvClient, s)
	if err != nil {
		return err
270
	}
S
sunby 已提交
271 272
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
273
	return nil
274
}
G
groot 已提交
275

S
sunby 已提交
276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293
func (s *Server) createCompactionHandler() {
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
	s.compactionHandler.start()
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator)
	s.compactionTrigger.start()
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

294 295 296
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
G
groot 已提交
297
		log.Debug("dataCoord initServiceDiscovery failed", zap.Error(err))
G
godchen 已提交
298 299
		return err
	}
300 301
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
302
	datanodes := make([]*NodeInfo, 0, len(sessions))
303
	for _, session := range sessions {
304 305 306
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
307
		}
308
		datanodes = append(datanodes, info)
309
	}
G
godchen 已提交
310

S
sunby 已提交
311
	s.cluster.Startup(datanodes)
312

313
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
314 315 316
	return nil
}

317
func (s *Server) startSegmentManager() {
318
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
319 320
}

S
sunby 已提交
321
func (s *Server) initMeta() error {
322
	connectEtcdFn := func() error {
X
XuanYang-cn 已提交
323
		etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
324 325 326
		if err != nil {
			return err
		}
X
XuanYang-cn 已提交
327 328

		s.kvClient = etcdKV
G
godchen 已提交
329
		s.meta, err = newMeta(s.kvClient)
330 331 332 333
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
334
	}
G
godchen 已提交
335
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
336 337
}

338 339
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
340
	s.serverLoopWg.Add(4)
341 342 343 344
	s.startStatsChannel(s.serverLoopCtx)
	s.startDataNodeTtLoop(s.serverLoopCtx)
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
345
	go s.session.LivenessCheck(s.serverLoopCtx, func() {
X
Xiaofan 已提交
346 347 348 349
		log.Error("Data Coord disconnected from etcd, process will exit", zap.Int64("Server Id", s.session.ServerID))
		if err := s.Stop(); err != nil {
			log.Fatal("failed to stop server", zap.Error(err))
		}
350
	})
351 352 353
}

func (s *Server) startStatsChannel(ctx context.Context) {
G
groot 已提交
354
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
355
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
356
	log.Debug("dataCoord create stats channel consumer",
357
		zap.String("channelName", Params.StatisticsChannelName),
358
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
359
	statsStream.Start()
360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer statsStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("stats channel shutdown")
				return
			default:
			}
			msgPack := statsStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil stats msg, shutdown stats channel")
				return
S
sunby 已提交
375
			}
376 377 378 379 380 381 382 383 384 385
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_SegmentStatistics {
					log.Warn("receive unknown msg from segment statistics channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
				for _, stat := range ssMsg.SegStats {
					s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
				}
386
			}
387
		}
388
	}()
389 390
}

S
sunby 已提交
391 392 393 394 395 396
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
397 398
	ttMsgStream.AsConsumerWithPosition([]string{Params.TimeTickChannelName},
		Params.DataCoordSubscriptionName, mqclient.SubscriptionPositionLatest)
399 400 401
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
402
	ttMsgStream.Start()
403

404 405 406 407 408 409
	go func() {
		var checker *LongTermChecker
		if enableTtChecker {
			checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
			checker.Start()
			defer checker.Stop()
S
sunby 已提交
410
		}
411 412 413 414 415 416 417 418 419
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		defer ttMsgStream.Close()
		for {
			select {
			case <-ctx.Done():
				log.Debug("data node tt loop shutdown")
				return
			default:
420
			}
421 422 423 424
			msgPack := ttMsgStream.Consume()
			if msgPack == nil {
				log.Debug("receive nil tt msg, shutdown tt channel")
				return
425
			}
426 427 428 429 430 431 432 433 434 435
			for _, msg := range msgPack.Msgs {
				if msg.Type() != commonpb.MsgType_DataNodeTt {
					log.Warn("receive unexpected msg type from tt channel",
						zap.Stringer("msgType", msg.Type()))
					continue
				}
				ttMsg := msg.(*msgstream.DataNodeTtMsg)
				if enableTtChecker {
					checker.Check()
				}
S
sunby 已提交
436

437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452
				ch := ttMsg.ChannelName
				ts := ttMsg.Timestamp
				if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
					log.Warn("failed to expire allocations", zap.Error(err))
					continue
				}
				physical, _ := tsoutil.ParseTS(ts)
				if time.Since(physical).Minutes() > 1 {
					// if lag behind, log every 1 mins about
					log.RatedWarn(60.0, "Time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("tt", physical))
				}
				segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
				if err != nil {
					log.Warn("get flushable segments failed", zap.Error(err))
					continue
				}
453

454
				staleSegments := s.meta.SelectSegments(func(info *SegmentInfo) bool {
S
sunby 已提交
455 456 457
					return info.GetInsertChannel() == ch &&
						!info.lastFlushTime.IsZero() &&
						time.Since(info.lastFlushTime).Minutes() >= segmentTimedFlushDuration
458
				})
459

460
				if len(segments)+len(staleSegments) == 0 {
S
sunby 已提交
461
					continue
462
				}
463 464 465 466 467 468 469
				log.Debug("flush segments", zap.Int64s("segmentIDs", segments), zap.Int("markSegments count", len(staleSegments)))
				segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
				for _, id := range segments {
					sInfo := s.meta.GetSegment(id)
					if sInfo == nil {
						log.Error("get segment from meta error", zap.Int64("id", id),
							zap.Error(err))
470 471
						continue
					}
472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487
					segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
					s.meta.SetLastFlushTime(id, time.Now())
				}
				markSegments := make([]*datapb.SegmentInfo, 0, len(staleSegments))
				for _, segment := range staleSegments {
					for _, fSeg := range segmentInfos {
						// check segment needs flush first
						if segment.GetID() == fSeg.GetID() {
							continue
						}
					}
					markSegments = append(markSegments, segment.SegmentInfo)
					s.meta.SetLastFlushTime(segment.GetID(), time.Now())
				}
				if len(segmentInfos)+len(markSegments) > 0 {
					s.cluster.Flush(s.ctx, segmentInfos, markSegments)
488
				}
S
sunby 已提交
489
			}
490
			s.helper.eventAfterHandleDataNodeTt()
491
		}
492
	}()
493 494
}

495
// start a goroutine wto watch services
496
func (s *Server) startWatchService(ctx context.Context) {
497 498 499 500 501
	go s.watchService(ctx)
}

// watchService watchs services
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
502
	defer logutil.LogPanic()
503 504 505 506 507 508
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
509 510 511 512 513
		case event, ok := <-s.eventCh:
			if !ok {
				//TODO add retry logic
				return
			}
514 515 516 517 518 519 520 521
			if err := s.handleSessionEvent(ctx, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("datacoord server stop error", zap.Error(err))
					}
				}()
				return
			}
522 523
		}
	}
524

525
}
S
sunby 已提交
526

527
// handles session events - DataNodes Add/Del
528
func (s *Server) handleSessionEvent(ctx context.Context, event *sessionutil.SessionEvent) error {
529
	if event == nil {
530
		return nil
531 532 533 534 535 536
	}
	info := &datapb.DataNodeInfo{
		Address:  event.Session.Address,
		Version:  event.Session.ServerID,
		Channels: []*datapb.ChannelStatus{},
	}
537 538 539 540
	node := &NodeInfo{
		NodeID:  event.Session.ServerID,
		Address: event.Session.Address,
	}
541 542 543 544 545
	switch event.EventType {
	case sessionutil.SessionAddEvent:
		log.Info("received datanode register",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
546 547 548 549
		if err := s.cluster.Register(node); err != nil {
			log.Warn("failed to regisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
550 551 552 553 554
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	case sessionutil.SessionDelEvent:
		log.Info("received datanode unregister",
			zap.String("address", info.Address),
			zap.Int64("serverID", info.Version))
555 556 557 558
		if err := s.cluster.UnRegister(node); err != nil {
			log.Warn("failed to deregisger node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
			return err
		}
559 560 561 562 563
		s.metricsCacheManager.InvalidateSystemInfoMetrics()
	default:
		log.Warn("receive unknown service event type",
			zap.Any("type", event.EventType))
	}
564
	return nil
565 566
}

S
sunby 已提交
567
func (s *Server) startFlushLoop(ctx context.Context) {
568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
				log.Debug("flush loop shutdown")
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
				_ = s.postFlush(ctx, segmentID)
			}
S
sunby 已提交
584
		}
585
	}()
S
sunby 已提交
586 587
}

588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
	segment := s.meta.GetSegment(segmentID)
	if segment == nil {
		log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
		return errors.New("segment not found")
	}
	// Notify RootCoord segment is flushed
	req := &datapb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType: commonpb.MsgType_SegmentFlushDone,
		},
		Segment: segment.SegmentInfo,
	}
	resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
	if err = VerifyResponse(resp, err); err != nil {
		log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
		return err
	}
	// set segment to SegmentState_Flushed
	if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
	log.Debug("flush segment complete", zap.Int64("id", segmentID))
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
620 621 622 623 624 625 626 627 628 629 630
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

631
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
632
	var err error
G
godchen 已提交
633
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
634 635
		return err
	}
636
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
637 638
		return err
	}
639
	return s.rootCoordClient.Start()
S
sunby 已提交
640
}
641

642 643 644 645
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
646
func (s *Server) Stop() error {
647
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
648 649
		return nil
	}
650
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
651
	s.cluster.Close()
S
sunby 已提交
652
	s.stopServerLoop()
S
sunby 已提交
653 654 655 656 657

	if Params.EnableCompaction {
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
S
sunby 已提交
658 659 660
	return nil
}

S
sunby 已提交
661 662
// CleanMeta only for test
func (s *Server) CleanMeta() error {
663
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
664
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
665 666
}

S
sunby 已提交
667 668 669 670 671
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

672 673 674 675 676 677 678 679 680 681 682 683 684 685
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
686

687 688
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
689
		Base: &commonpb.MsgBase{
690
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
691 692 693 694 695 696 697 698
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
699
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
700 701
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
702 703
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
704
			SourceID:  Params.NodeID,
705
		},
S
sunby 已提交
706 707 708 709 710
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
711 712
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
713 714
		return err
	}
S
sunby 已提交
715
	collInfo := &datapb.CollectionInfo{
716 717 718 719
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
S
sunby 已提交
720
	}
S
sunby 已提交
721 722
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
723 724
}

S
sunby 已提交
725
// GetVChanPositions get vchannel latest postitions with provided dml channel names
726 727 728 729 730 731 732
func (s *Server) GetVChanPositions(channel string, collectionID UniqueID, seekFromStartPosition bool) *datapb.VchannelInfo {
	segments := s.meta.GetSegmentsByChannel(channel)
	flushed := make([]*datapb.SegmentInfo, 0)
	unflushed := make([]*datapb.SegmentInfo, 0)
	var seekPosition *internalpb.MsgPosition
	for _, s := range segments {
		if s.State == commonpb.SegmentState_Flushing || s.State == commonpb.SegmentState_Flushed {
733
			flushed = append(flushed, trimSegmentInfo(s.SegmentInfo))
734
			if seekPosition == nil || (s.DmlPosition.Timestamp < seekPosition.Timestamp) {
735
				seekPosition = s.DmlPosition
S
sunby 已提交
736
			}
737 738
			continue
		}
739

740
		if s.DmlPosition == nil { // segment position all nil
741 742
			continue
		}
S
sunby 已提交
743

744
		unflushed = append(unflushed, trimSegmentInfo(s.SegmentInfo))
S
sunby 已提交
745

746 747 748 749 750 751 752 753 754
		segmentPosition := s.DmlPosition
		if seekFromStartPosition {
			// need to use start position when load collection/partition, querynode does not support seek from checkpoint yet
			// TODO silverxia remove seek from start logic after checkpoint supported in querynode
			segmentPosition = s.StartPosition
		}

		if seekPosition == nil || segmentPosition.Timestamp < seekPosition.Timestamp {
			seekPosition = segmentPosition
755
		}
756 757 758 759 760 761 762 763 764 765
	}
	// use collection start position when segment position is not found
	if seekPosition == nil {
		coll := s.meta.GetCollection(collectionID)
		if coll != nil {
			for _, sp := range coll.GetStartPositions() {
				if sp.GetKey() == rootcoord.ToPhysicalChannel(channel) {
					seekPosition = &internalpb.MsgPosition{
						ChannelName: channel,
						MsgID:       sp.GetData(),
766 767 768 769
					}
				}
			}
		}
770
	}
771

772 773 774 775 776 777
	return &datapb.VchannelInfo{
		CollectionID:      collectionID,
		ChannelName:       channel,
		SeekPosition:      seekPosition,
		FlushedSegments:   flushed,
		UnflushedSegments: unflushed,
S
sunby 已提交
778
	}
779
}
780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795

// trimSegmentInfo returns a shallow copy of datapb.SegmentInfo and sets ALL binlog info to nil
func trimSegmentInfo(info *datapb.SegmentInfo) *datapb.SegmentInfo {
	return &datapb.SegmentInfo{
		ID:             info.ID,
		CollectionID:   info.CollectionID,
		PartitionID:    info.PartitionID,
		InsertChannel:  info.InsertChannel,
		NumOfRows:      info.NumOfRows,
		State:          info.State,
		MaxRowNum:      info.MaxRowNum,
		LastExpireTime: info.LastExpireTime,
		StartPosition:  info.StartPosition,
		DmlPosition:    info.DmlPosition,
	}
}