server.go 33.0 KB
Newer Older
1 2 3 4 5 6
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
S
sunby 已提交
7 8
// with the License. You may obtain a copy of the License at
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
S
sunby 已提交
10
//
11 12 13 14 15
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
16

17
package datacoord
S
sunby 已提交
18

S
sunby 已提交
19 20
import (
	"context"
21
	"fmt"
S
sunby 已提交
22
	"math/rand"
J
Ji Bin 已提交
23
	"os"
S
sunby 已提交
24
	"sync"
S
sunby 已提交
25
	"sync/atomic"
26
	"syscall"
S
sunby 已提交
27 28
	"time"

29
	"github.com/blang/semver/v4"
30
	"github.com/cockroachdb/errors"
G
godchen 已提交
31 32 33
	clientv3 "go.etcd.io/etcd/client/v3"
	"go.uber.org/zap"

34 35
	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
S
sunby 已提交
36
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
W
wayblink 已提交
37
	indexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
38
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
Y
yiwangdr 已提交
39
	"github.com/milvus-io/milvus/internal/kv"
X
Xiangyu Wang 已提交
40
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
41
	"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
X
Xiangyu Wang 已提交
42
	"github.com/milvus-io/milvus/internal/proto/datapb"
43
	"github.com/milvus-io/milvus/internal/storage"
X
Xiangyu Wang 已提交
44
	"github.com/milvus-io/milvus/internal/types"
G
godchen 已提交
45
	"github.com/milvus-io/milvus/internal/util/dependency"
G
godchen 已提交
46
	"github.com/milvus-io/milvus/internal/util/sessionutil"
47
	"github.com/milvus-io/milvus/pkg/common"
48 49 50 51 52 53 54
	"github.com/milvus-io/milvus/pkg/log"
	"github.com/milvus-io/milvus/pkg/metrics"
	"github.com/milvus-io/milvus/pkg/mq/msgstream"
	"github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper"
	"github.com/milvus-io/milvus/pkg/util/commonpbutil"
	"github.com/milvus-io/milvus/pkg/util/funcutil"
	"github.com/milvus-io/milvus/pkg/util/logutil"
55
	"github.com/milvus-io/milvus/pkg/util/merr"
56 57 58 59
	"github.com/milvus-io/milvus/pkg/util/metricsinfo"
	"github.com/milvus-io/milvus/pkg/util/paramtable"
	"github.com/milvus-io/milvus/pkg/util/retry"
	"github.com/milvus-io/milvus/pkg/util/timerecord"
60
	"github.com/milvus-io/milvus/pkg/util/tsoutil"
61
	"github.com/milvus-io/milvus/pkg/util/typeutil"
S
sunby 已提交
62 63
)

64
const (
65
	connEtcdMaxRetryTime = 100
66
	allPartitionID       = 0 // partitionID means no filtering
67
)
S
sunby 已提交
68

69 70
var (
	// TODO: sunby put to config
71 72
	enableTtChecker           = true
	ttCheckerName             = "dataTtChecker"
X
Xiaofan 已提交
73 74
	ttMaxInterval             = 2 * time.Minute
	ttCheckerWarnMsg          = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes())
75
	segmentTimedFlushDuration = 10.0
76 77
)

S
sunby 已提交
78
type (
79 80 81
	// UniqueID shortcut for typeutil.UniqueID
	UniqueID = typeutil.UniqueID
	// Timestamp shortcurt for typeutil.Timestamp
S
sunby 已提交
82
	Timestamp = typeutil.Timestamp
S
sunby 已提交
83
)
S
sunby 已提交
84

85
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
86

W
wayblink 已提交
87
type indexNodeCreatorFunc func(ctx context.Context, addr string) (types.IndexNode, error)
88

X
Xiaofan 已提交
89
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdClient *clientv3.Client) (types.RootCoord, error)
S
sunby 已提交
90

91 92 93
// makes sure Server implements `DataCoord`
var _ types.DataCoord = (*Server)(nil)

E
Enwei Jiao 已提交
94
var Params *paramtable.ComponentParam = paramtable.Get()
95

96 97
// Server implements `types.DataCoord`
// handles Data Coordinator related jobs
N
neza2017 已提交
98
type Server struct {
S
sunby 已提交
99 100 101 102
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
103
	quitCh           chan struct{}
104
	stateCode        atomic.Value
S
sunby 已提交
105
	helper           ServerHelper
106

X
Xiaofan 已提交
107
	etcdCli          *clientv3.Client
E
Enwei Jiao 已提交
108
	address          string
Y
yiwangdr 已提交
109
	kvClient         kv.MetaKv
110 111 112 113 114 115 116 117 118
	meta             *meta
	segmentManager   Manager
	allocator        allocator
	cluster          *Cluster
	sessionManager   *SessionManager
	channelManager   *ChannelManager
	rootCoordClient  types.RootCoord
	garbageCollector *garbageCollector
	gcOpt            GcOption
119
	handler          Handler
120

S
sunby 已提交
121 122 123
	compactionTrigger trigger
	compactionHandler compactionPlanContext

124 125
	metricsCacheManager *metricsinfo.MetricsCacheManager

126 127 128 129
	flushCh         chan UniqueID
	buildIndexCh    chan UniqueID
	notifyIndexChan chan UniqueID
	factory         dependency.Factory
130

131
	session   *sessionutil.Session
132
	icSession *sessionutil.Session
133
	dnEventCh <-chan *sessionutil.SessionEvent
134 135
	inEventCh <-chan *sessionutil.SessionEvent
	//qcEventCh <-chan *sessionutil.SessionEvent
136

137
	enableActiveStandBy bool
138
	activateFunc        func() error
139

140
	dataNodeCreator        dataNodeCreatorFunc
W
wayblink 已提交
141
	indexNodeCreator       indexNodeCreatorFunc
142
	rootCoordClientCreator rootCoordCreatorFunc
143
	//indexCoord             types.IndexCoord
144

145 146 147
	//segReferManager  *SegmentReferenceManager
	indexBuilder     *indexBuilder
	indexNodeManager *IndexNodeManager
N
neza2017 已提交
148
}
S
sunby 已提交
149

150
// ServerHelper datacoord server injection helper
S
sunby 已提交
151 152 153 154 155 156 157 158 159 160
type ServerHelper struct {
	eventAfterHandleDataNodeTt func()
}

func defaultServerHelper() ServerHelper {
	return ServerHelper{
		eventAfterHandleDataNodeTt: func() {},
	}
}

161
// Option utility function signature to set DataCoord server attributes
S
sunby 已提交
162 163
type Option func(svr *Server)

W
wayblink 已提交
164 165
// WithRootCoordCreator returns an `Option` setting RootCoord creator with provided parameter
func WithRootCoordCreator(creator rootCoordCreatorFunc) Option {
S
sunby 已提交
166 167 168 169 170
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

W
wayblink 已提交
171 172
// WithServerHelper returns an `Option` setting ServerHelp with provided parameter
func WithServerHelper(helper ServerHelper) Option {
S
sunby 已提交
173 174 175 176 177
	return func(svr *Server) {
		svr.helper = helper
	}
}

W
wayblink 已提交
178 179
// WithCluster returns an `Option` setting Cluster with provided parameter
func WithCluster(cluster *Cluster) Option {
180 181 182 183 184
	return func(svr *Server) {
		svr.cluster = cluster
	}
}

W
wayblink 已提交
185 186
// WithDataNodeCreator returns an `Option` setting DataNode create function
func WithDataNodeCreator(creator dataNodeCreatorFunc) Option {
187 188 189 190 191
	return func(svr *Server) {
		svr.dataNodeCreator = creator
	}
}

W
wayblink 已提交
192 193
// WithSegmentManager returns an Option to set SegmentManager
func WithSegmentManager(manager Manager) Option {
B
Bingyi Sun 已提交
194 195 196 197 198
	return func(svr *Server) {
		svr.segmentManager = manager
	}
}

199
// CreateServer creates a `Server` instance
G
godchen 已提交
200
func CreateServer(ctx context.Context, factory dependency.Factory, opts ...Option) *Server {
S
sunby 已提交
201
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
202
	s := &Server{
S
sunby 已提交
203
		ctx:                    ctx,
204
		quitCh:                 make(chan struct{}),
G
godchen 已提交
205
		factory:                factory,
S
sunby 已提交
206
		flushCh:                make(chan UniqueID, 1024),
207 208
		buildIndexCh:           make(chan UniqueID, 1024),
		notifyIndexChan:        make(chan UniqueID),
209
		dataNodeCreator:        defaultDataNodeCreatorFunc,
W
wayblink 已提交
210
		indexNodeCreator:       defaultIndexNodeCreatorFunc,
S
sunby 已提交
211
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
212
		helper:                 defaultServerHelper(),
213
		metricsCacheManager:    metricsinfo.NewMetricsCacheManager(),
214
		enableActiveStandBy:    Params.DataCoordCfg.EnableActiveStandby.GetAsBool(),
S
sunby 已提交
215
	}
S
sunby 已提交
216 217 218 219

	for _, opt := range opts {
		opt(s)
	}
X
Xiaofan 已提交
220
	return s
S
sunby 已提交
221 222
}

G
godchen 已提交
223 224
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
225 226
}

W
wayblink 已提交
227 228 229 230
func defaultIndexNodeCreatorFunc(ctx context.Context, addr string) (types.IndexNode, error) {
	return indexnodeclient.NewClient(context.TODO(), addr, Params.DataCoordCfg.WithCredential.GetAsBool())
}

X
Xiaofan 已提交
231 232
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, client *clientv3.Client) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, client)
S
sunby 已提交
233 234
}

235 236 237 238 239
// QuitSignal returns signal when server quits
func (s *Server) QuitSignal() <-chan struct{} {
	return s.quitCh
}

240
// Register registers data service at etcd
241
func (s *Server) Register() error {
242 243
	// first register indexCoord
	s.icSession.Register()
244
	s.session.Register()
245
	if s.enableActiveStandBy {
246 247 248 249 250 251 252 253
		err := s.icSession.ProcessActiveStandBy(nil)
		if err != nil {
			return err
		}
		err = s.session.ProcessActiveStandBy(s.activateFunc)
		if err != nil {
			return err
		}
254
	}
255 256 257

	s.session.LivenessCheck(s.serverLoopCtx, func() {
		logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.ServerID))
258
		if err := s.Stop(); err != nil {
259
			logutil.Logger(s.ctx).Fatal("failed to stop server", zap.Error(err))
260 261
		}
		// manually send signal to starter goroutine
X
Xiaofan 已提交
262
		if s.session.TriggerKill {
J
Ji Bin 已提交
263 264 265
			if p, err := os.FindProcess(os.Getpid()); err == nil {
				p.Signal(syscall.SIGINT)
			}
X
Xiaofan 已提交
266
		}
267 268 269 270 271
	})
	return nil
}

func (s *Server) initSession() error {
272 273 274 275 276
	s.icSession = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
	if s.icSession == nil {
		return errors.New("failed to initialize IndexCoord session")
	}
	s.icSession.Init(typeutil.IndexCoordRole, s.address, true, true)
277
	s.icSession.SetEnableActiveStandBy(s.enableActiveStandBy)
278

279
	s.session = sessionutil.NewSession(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli)
280 281 282
	if s.session == nil {
		return errors.New("failed to initialize session")
	}
E
Enwei Jiao 已提交
283
	s.session.Init(typeutil.DataCoordRole, s.address, true, true)
284
	s.session.SetEnableActiveStandBy(s.enableActiveStandBy)
285 286 287
	return nil
}

288
// Init change server state to Initializing
289
func (s *Server) Init() error {
290
	var err error
E
Enwei Jiao 已提交
291
	s.factory.Init(Params)
292 293 294 295
	if err = s.initSession(); err != nil {
		return err
	}
	if s.enableActiveStandBy {
296
		s.activateFunc = func() error {
297 298
			log.Info("DataCoord switch from standby to active, activating")
			if err := s.initDataCoord(); err != nil {
299 300
				log.Error("DataCoord init failed", zap.Error(err))
				return err
301 302 303
			}
			s.startDataCoord()
			log.Info("DataCoord startup success")
304
			return nil
305 306 307 308 309
		}
		s.stateCode.Store(commonpb.StateCode_StandBy)
		log.Info("DataCoord enter standby mode successfully")
		return nil
	}
S
sunby 已提交
310

311 312 313 314
	return s.initDataCoord()
}

func (s *Server) initDataCoord() error {
315
	s.stateCode.Store(commonpb.StateCode_Initializing)
316
	var err error
317
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
318 319
		return err
	}
320

321 322 323 324 325
	storageCli, err := s.newChunkManagerFactory()
	if err != nil {
		return err
	}

326
	if err = s.initMeta(storageCli); err != nil {
S
sunby 已提交
327 328
		return err
	}
329

330 331
	s.handler = newServerHandler(s)

S
sunby 已提交
332 333 334
	if err = s.initCluster(); err != nil {
		return err
	}
335

C
congqixia 已提交
336
	s.allocator = newRootCoordAllocator(s.rootCoordClient)
337

338 339 340
	if err = s.initSession(); err != nil {
		return err
	}
341
	s.initIndexNodeManager()
342

343 344 345 346
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}

347
	if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
S
sunby 已提交
348 349 350
		s.createCompactionHandler()
		s.createCompactionTrigger()
	}
351
	s.initSegmentManager()
352

353
	s.initGarbageCollection(storageCli)
354
	s.initIndexBuilder(storageCli)
355

356 357
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

358 359 360 361
	return nil
}

// Start initialize `Server` members and start loops, follow steps are taken:
362 363 364 365 366 367
//  1. initialize message factory parameters
//  2. initialize root coord client, meta, datanode cluster, segment info channel,
//     allocator, segment manager
//  3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//     datanodes etcd watch, etcd alive check and flush completed status check
//  4. set server state to Healthy
368
func (s *Server) Start() error {
369 370 371 372 373 374 375 376 377
	if !s.enableActiveStandBy {
		s.startDataCoord()
		log.Info("DataCoord startup successfully")
	}

	return nil
}

func (s *Server) startDataCoord() {
378
	if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
Z
zhenshan.cao 已提交
379 380 381
		s.compactionHandler.start()
		s.compactionTrigger.start()
	}
382
	s.startServerLoop()
383 384 385 386
	// DataCoord (re)starts successfully and starts to collection segment stats
	// data from all DataNode.
	// This will prevent DataCoord from missing out any important segment stats
	// data while offline.
387
	log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
388
	s.reCollectSegmentStats(s.ctx)
389
	s.stateCode.Store(commonpb.StateCode_Healthy)
390 391 392
}

func (s *Server) initCluster() error {
393 394 395 396
	if s.cluster != nil {
		return nil
	}

S
sunby 已提交
397
	var err error
398 399
	s.channelManager, err = NewChannelManager(s.kvClient, s.handler, withMsgstreamFactory(s.factory),
		withStateChecker(), withBgChecker())
400 401
	if err != nil {
		return err
402
	}
S
sunby 已提交
403 404
	s.sessionManager = NewSessionManager(withSessionCreator(s.dataNodeCreator))
	s.cluster = NewCluster(s.sessionManager, s.channelManager)
405
	return nil
406
}
G
groot 已提交
407

E
Enwei Jiao 已提交
408 409 410 411
func (s *Server) SetAddress(address string) {
	s.address = address
}

X
Xiaofan 已提交
412 413 414 415 416
// SetEtcdClient sets etcd client for datacoord.
func (s *Server) SetEtcdClient(client *clientv3.Client) {
	s.etcdCli = client
}

W
wayblink 已提交
417 418 419 420 421 422 423 424 425 426 427 428
func (s *Server) SetRootCoord(rootCoord types.RootCoord) {
	s.rootCoordClient = rootCoord
}

func (s *Server) SetDataNodeCreator(f func(context.Context, string) (types.DataNode, error)) {
	s.dataNodeCreator = f
}

func (s *Server) SetIndexNodeCreator(f func(context.Context, string) (types.IndexNode, error)) {
	s.indexNodeCreator = f
}

S
sunby 已提交
429
func (s *Server) createCompactionHandler() {
430
	s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator, s.flushCh)
S
sunby 已提交
431 432 433 434 435 436 437
}

func (s *Server) stopCompactionHandler() {
	s.compactionHandler.stop()
}

func (s *Server) createCompactionTrigger() {
438
	s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler)
S
sunby 已提交
439 440 441 442 443 444
}

func (s *Server) stopCompactionTrigger() {
	s.compactionTrigger.stop()
}

445
func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
E
Enwei Jiao 已提交
446
	chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
447
	cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
448 449
	if err != nil {
		log.Error("chunk manager init failed", zap.Error(err))
450
		return nil, err
451
	}
452 453 454 455
	return cli, err
}

func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
456
	s.garbageCollector = newGarbageCollector(s.meta, s.handler, GcOption{
457
		cli:              cli,
458 459 460 461
		enabled:          Params.DataCoordCfg.EnableGarbageCollection.GetAsBool(),
		checkInterval:    Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second),
		missingTolerance: Params.DataCoordCfg.GCMissingTolerance.GetAsDuration(time.Second),
		dropTolerance:    Params.DataCoordCfg.GCDropTolerance.GetAsDuration(time.Second),
462 463 464
	})
}

465
func (s *Server) initServiceDiscovery() error {
466
	r := semver.MustParseRange(">=2.2.3")
467
	sessions, rev, err := s.session.GetSessionsWithVersionRange(typeutil.DataNodeRole, r)
468
	if err != nil {
X
Xiaofan 已提交
469
		log.Warn("DataCoord failed to init service discovery", zap.Error(err))
G
godchen 已提交
470 471
		return err
	}
X
Xiaofan 已提交
472
	log.Info("DataCoord success to get DataNode sessions", zap.Any("sessions", sessions))
473

S
sunby 已提交
474
	datanodes := make([]*NodeInfo, 0, len(sessions))
475
	for _, session := range sessions {
476 477 478
		info := &NodeInfo{
			NodeID:  session.ServerID,
			Address: session.Address,
S
sunby 已提交
479
		}
480
		datanodes = append(datanodes, info)
481
	}
G
godchen 已提交
482

483
	s.cluster.Startup(s.ctx, datanodes)
484

485
	// TODO implement rewatch logic
486
	s.dnEventCh = s.session.WatchServicesWithVersionRange(typeutil.DataNodeRole, r, rev+1, nil)
487

488
	inSessions, inRevision, err := s.session.GetSessions(typeutil.IndexNodeRole)
489 490 491 492
	if err != nil {
		log.Error("DataCoord get QueryCoord session failed", zap.Error(err))
		return err
	}
493 494 495 496 497 498 499 500 501 502 503 504 505 506
	if Params.DataCoordCfg.BindIndexNodeMode.GetAsBool() {
		if err = s.indexNodeManager.AddNode(Params.DataCoordCfg.IndexNodeID.GetAsInt64(), Params.DataCoordCfg.IndexNodeAddress.GetValue()); err != nil {
			log.Error("add indexNode fail", zap.Int64("ServerID", Params.DataCoordCfg.IndexNodeID.GetAsInt64()),
				zap.String("address", Params.DataCoordCfg.IndexNodeAddress.GetValue()), zap.Error(err))
			return err
		}
		log.Info("add indexNode success", zap.String("IndexNode address", Params.DataCoordCfg.IndexNodeAddress.GetValue()),
			zap.Int64("nodeID", Params.DataCoordCfg.IndexNodeID.GetAsInt64()))
	} else {
		for _, session := range inSessions {
			if err := s.indexNodeManager.AddNode(session.ServerID, session.Address); err != nil {
				return err
			}
		}
507
	}
508
	s.inEventCh = s.session.WatchServices(typeutil.IndexNodeRole, inRevision+1, nil)
509

510
	return nil
S
sunby 已提交
511 512
}

513
func (s *Server) initSegmentManager() {
B
Bingyi Sun 已提交
514
	if s.segmentManager == nil {
515
		s.segmentManager = newSegmentManager(s.meta, s.allocator, s.rootCoordClient)
B
Bingyi Sun 已提交
516
	}
517 518
}

519
func (s *Server) initMeta(chunkManager storage.ChunkManager) error {
W
wayblink 已提交
520 521 522
	if s.meta != nil {
		return nil
	}
523
	etcdKV := etcdkv.NewEtcdKV(s.etcdCli, Params.EtcdCfg.MetaRootPath.GetValue())
524

X
Xiaofan 已提交
525 526 527
	s.kvClient = etcdKV
	reloadEtcdFn := func() error {
		var err error
528 529
		catalog := datacoord.NewCatalog(etcdKV, chunkManager.RootPath(), Params.EtcdCfg.MetaRootPath.GetValue())
		s.meta, err = newMeta(s.ctx, catalog, chunkManager)
530 531 532 533
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
534
	}
X
Xiaofan 已提交
535
	return retry.Do(s.ctx, reloadEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
536 537
}

538 539 540 541 542 543 544 545
func (s *Server) initIndexBuilder(manager storage.ChunkManager) {
	if s.indexBuilder == nil {
		s.indexBuilder = newIndexBuilder(s.ctx, s.meta, s.indexNodeManager, manager)
	}
}

func (s *Server) initIndexNodeManager() {
	if s.indexNodeManager == nil {
W
wayblink 已提交
546
		s.indexNodeManager = NewNodeManager(s.ctx, s.indexNodeCreator)
547 548 549
	}
}

550
func (s *Server) startServerLoop() {
551 552 553 554 555
	s.serverLoopWg.Add(2)
	if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
		s.serverLoopWg.Add(1)
		s.startDataNodeTtLoop(s.serverLoopCtx)
	}
556 557
	s.startWatchService(s.serverLoopCtx)
	s.startFlushLoop(s.serverLoopCtx)
558
	s.startIndexService(s.serverLoopCtx)
559
	s.garbageCollector.start()
560 561
}

562 563
// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
// tt msg stands for the currently consumed timestamp for each channel
S
sunby 已提交
564
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
G
godchen 已提交
565
	ttMsgStream, err := s.factory.NewMsgStream(ctx)
S
sunby 已提交
566
	if err != nil {
B
Bingyi Sun 已提交
567
		log.Error("DataCoord failed to create timetick channel", zap.Error(err))
X
Xiaofan 已提交
568
		panic(err)
S
sunby 已提交
569
	}
S
smellthemoon 已提交
570 571 572 573 574

	timeTickChannel := Params.CommonCfg.DataCoordTimeTick.GetValue()
	if Params.CommonCfg.PreCreatedTopicEnabled.GetAsBool() {
		timeTickChannel = Params.CommonCfg.TimeTicker.GetValue()
	}
575
	subName := fmt.Sprintf("%s-%d-datanodeTl", Params.CommonCfg.DataCoordSubName.GetValue(), paramtable.GetNodeID())
S
smellthemoon 已提交
576 577

	ttMsgStream.AsConsumer([]string{timeTickChannel}, subName, mqwrapper.SubscriptionPositionLatest)
X
Xiaofan 已提交
578
	log.Info("DataCoord creates the timetick channel consumer",
S
smellthemoon 已提交
579
		zap.String("timeTickChannel", timeTickChannel),
580
		zap.String("subscription", subName))
581

582 583 584 585 586 587 588 589 590 591
	go s.handleDataNodeTimetickMsgstream(ctx, ttMsgStream)
}

func (s *Server) handleDataNodeTimetickMsgstream(ctx context.Context, ttMsgStream msgstream.MsgStream) {
	var checker *timerecord.LongTermChecker
	if enableTtChecker {
		checker = timerecord.NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
		defer checker.Stop()
	}
592

593 594 595 596 597 598 599 600
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	defer func() {
		// https://github.com/milvus-io/milvus/issues/15659
		// msgstream service closed before datacoord quits
		defer func() {
			if x := recover(); x != nil {
				log.Error("Failed to close ttMessage", zap.Any("recovered", x))
601
			}
602 603 604 605 606 607
		}()
		ttMsgStream.Close()
	}()
	for {
		select {
		case <-ctx.Done():
X
Xiaofan 已提交
608
			log.Info("DataNode timetick loop shutdown")
609
			return
610 611 612 613
		case msgPack, ok := <-ttMsgStream.Chan():
			if !ok || msgPack == nil || len(msgPack.Msgs) == 0 {
				log.Info("receive nil timetick msg and shutdown timetick channel")
				return
614
			}
S
sunby 已提交
615

616 617 618 619 620 621 622 623 624 625 626 627 628 629
			for _, msg := range msgPack.Msgs {
				ttMsg, ok := msg.(*msgstream.DataNodeTtMsg)
				if !ok {
					log.Warn("receive unexpected msg type from tt channel")
					continue
				}
				if enableTtChecker {
					checker.Check()
				}

				if err := s.handleTimetickMessage(ctx, ttMsg); err != nil {
					log.Error("failed to handle timetick message", zap.Error(err))
					continue
				}
S
sunby 已提交
630
			}
631
			s.helper.eventAfterHandleDataNodeTt()
632
		}
633
	}
634 635
}

636
func (s *Server) handleTimetickMessage(ctx context.Context, ttMsg *msgstream.DataNodeTtMsg) error {
637
	log := log.Ctx(ctx).WithRateGroup("dc.handleTimetick", 1, 60)
638 639 640 641 642 643 644
	ch := ttMsg.GetChannelName()
	ts := ttMsg.GetTimestamp()
	physical, _ := tsoutil.ParseTS(ts)
	if time.Since(physical).Minutes() > 1 {
		// if lag behind, log every 1 mins about
		log.RatedWarn(60.0, "time tick lag behind for more than 1 minutes", zap.String("channel", ch), zap.Time("timetick", physical))
	}
645 646 647 648 649
	// ignore report from a different node
	if !s.cluster.channelManager.Match(ttMsg.GetBase().GetSourceID(), ch) {
		log.Warn("node is not matched with channel", zap.String("channel", ch), zap.Int64("nodeID", ttMsg.GetBase().GetSourceID()))
		return nil
	}
650

J
jaime 已提交
651
	sub := tsoutil.SubByNow(ts)
652
	pChannelName := funcutil.ToPhysicalChannel(ch)
J
jaime 已提交
653 654 655
	metrics.DataCoordConsumeDataNodeTimeTickLag.
		WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), pChannelName).
		Set(float64(sub))
656

657 658 659 660 661 662 663 664 665 666 667 668
	s.updateSegmentStatistics(ttMsg.GetSegmentsStats())

	if err := s.segmentManager.ExpireAllocations(ch, ts); err != nil {
		return fmt.Errorf("expire allocations: %w", err)
	}

	flushableIDs, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
	if err != nil {
		return fmt.Errorf("get flushable segments: %w", err)
	}
	flushableSegments := s.getFlushableSegmentsInfo(flushableIDs)

B
bigsheeper 已提交
669
	if len(flushableSegments) == 0 {
670 671 672
		return nil
	}

673
	log.Info("start flushing segments",
B
bigsheeper 已提交
674
		zap.Int64s("segment IDs", flushableIDs))
675 676
	// update segment last update triggered time
	// it's ok to fail flushing, since next timetick after duration will re-trigger
677 678
	s.setLastFlushTime(flushableSegments)

B
bigsheeper 已提交
679
	finfo := make([]*datapb.SegmentInfo, 0, len(flushableSegments))
680 681 682
	for _, info := range flushableSegments {
		finfo = append(finfo, info.SegmentInfo)
	}
B
bigsheeper 已提交
683
	err = s.cluster.Flush(s.ctx, ttMsg.GetBase().GetSourceID(), ch, finfo)
684
	if err != nil {
X
Xiaofan 已提交
685
		log.Warn("failed to handle flush", zap.Any("source", ttMsg.GetBase().GetSourceID()), zap.Error(err))
686 687 688
		return err
	}

689 690 691
	return nil
}

692
func (s *Server) updateSegmentStatistics(stats []*commonpb.SegmentStats) {
693
	for _, stat := range stats {
694
		segment := s.meta.GetSegment(stat.GetSegmentID())
X
Xiaofan 已提交
695 696 697 698 699 700 701 702 703 704 705 706 707 708
		if segment == nil {
			log.Warn("skip updating row number for not exist segment",
				zap.Int64("segment ID", stat.GetSegmentID()),
				zap.Int64("new value", stat.GetNumRows()))
			continue
		}

		if isFlushState(segment.GetState()) {
			log.Warn("skip updating row number for flushed segment",
				zap.Int64("segment ID", stat.GetSegmentID()),
				zap.Int64("new value", stat.GetNumRows()))
			continue
		}

709
		// Log if # of rows is updated.
X
Xiaofan 已提交
710
		if segment.currRows < stat.GetNumRows() {
711
			log.Debug("Updating segment number of rows",
712
				zap.Int64("segment ID", stat.GetSegmentID()),
713
				zap.Int64("old value", s.meta.GetSegment(stat.GetSegmentID()).GetNumOfRows()),
714 715
				zap.Int64("new value", stat.GetNumRows()),
			)
X
Xiaofan 已提交
716
			s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
717
		}
718 719 720 721 722 723
	}
}

func (s *Server) getFlushableSegmentsInfo(flushableIDs []int64) []*SegmentInfo {
	res := make([]*SegmentInfo, 0, len(flushableIDs))
	for _, id := range flushableIDs {
724
		sinfo := s.meta.GetHealthySegment(id)
725 726 727 728 729 730 731 732 733 734 735 736 737 738 739
		if sinfo == nil {
			log.Error("get segment from meta error", zap.Int64("id", id))
			continue
		}
		res = append(res, sinfo)
	}
	return res
}

func (s *Server) setLastFlushTime(segments []*SegmentInfo) {
	for _, sinfo := range segments {
		s.meta.SetLastFlushTime(sinfo.GetID(), time.Now())
	}
}

740
// start a goroutine wto watch services
741
func (s *Server) startWatchService(ctx context.Context) {
742 743 744
	go s.watchService(ctx)
}

745 746
func (s *Server) stopServiceWatch() {
	// ErrCompacted is handled inside SessionWatcher, which means there is some other error occurred, closing server.
747
	logutil.Logger(s.ctx).Error("watch service channel closed", zap.Int64("serverID", paramtable.GetNodeID()))
748 749 750 751 752 753 754 755
	go s.Stop()
	if s.session.TriggerKill {
		if p, err := os.FindProcess(os.Getpid()); err == nil {
			p.Signal(syscall.SIGINT)
		}
	}
}

756
// watchService watches services.
757
func (s *Server) watchService(ctx context.Context) {
S
sunby 已提交
758
	defer logutil.LogPanic()
759 760 761 762
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
X
Xiaofan 已提交
763
			log.Info("watch service shutdown")
764
			return
765
		case event, ok := <-s.dnEventCh:
766
			if !ok {
767
				s.stopServiceWatch()
768 769
				return
			}
770
			if err := s.handleSessionEvent(ctx, typeutil.DataNodeRole, event); err != nil {
771 772
				go func() {
					if err := s.Stop(); err != nil {
773
						log.Warn("DataCoord server stop error", zap.Error(err))
774 775 776 777
					}
				}()
				return
			}
778
		case event, ok := <-s.inEventCh:
779
			if !ok {
780
				s.stopServiceWatch()
781 782
				return
			}
783 784 785 786 787 788 789 790
			if err := s.handleSessionEvent(ctx, typeutil.IndexNodeRole, event); err != nil {
				go func() {
					if err := s.Stop(); err != nil {
						log.Warn("DataCoord server stop error", zap.Error(err))
					}
				}()
				return
			}
791 792 793
		}
	}
}
S
sunby 已提交
794

795
// handles session events - DataNodes Add/Del
796
func (s *Server) handleSessionEvent(ctx context.Context, role string, event *sessionutil.SessionEvent) error {
797
	if event == nil {
798
		return nil
799
	}
800 801 802 803 804 805
	switch role {
	case typeutil.DataNodeRole:
		info := &datapb.DataNodeInfo{
			Address:  event.Session.Address,
			Version:  event.Session.ServerID,
			Channels: []*datapb.ChannelStatus{},
806
		}
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852
		node := &NodeInfo{
			NodeID:  event.Session.ServerID,
			Address: event.Session.Address,
		}
		switch event.EventType {
		case sessionutil.SessionAddEvent:
			log.Info("received datanode register",
				zap.String("address", info.Address),
				zap.Int64("serverID", info.Version))
			if err := s.cluster.Register(node); err != nil {
				log.Warn("failed to register node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
				return err
			}
			s.metricsCacheManager.InvalidateSystemInfoMetrics()
		case sessionutil.SessionDelEvent:
			log.Info("received datanode unregister",
				zap.String("address", info.Address),
				zap.Int64("serverID", info.Version))
			if err := s.cluster.UnRegister(node); err != nil {
				log.Warn("failed to deregister node", zap.Int64("id", node.NodeID), zap.String("address", node.Address), zap.Error(err))
				return err
			}
			s.metricsCacheManager.InvalidateSystemInfoMetrics()
		default:
			log.Warn("receive unknown service event type",
				zap.Any("type", event.EventType))
		}
	case typeutil.IndexNodeRole:
		switch event.EventType {
		case sessionutil.SessionAddEvent:
			log.Info("received indexnode register",
				zap.String("address", event.Session.Address),
				zap.Int64("serverID", event.Session.ServerID))
			return s.indexNodeManager.AddNode(event.Session.ServerID, event.Session.Address)
		case sessionutil.SessionDelEvent:
			log.Info("received indexnode unregister",
				zap.String("address", event.Session.Address),
				zap.Int64("serverID", event.Session.ServerID))
			s.indexNodeManager.RemoveNode(event.Session.ServerID)
		case sessionutil.SessionUpdateEvent:
			serverID := event.Session.ServerID
			log.Info("received indexnode SessionUpdateEvent", zap.Int64("serverID", serverID))
			s.indexNodeManager.StoppingNode(serverID)
		default:
			log.Warn("receive unknown service event type",
				zap.Any("type", event.EventType))
853
		}
854
	}
855

856
	return nil
857 858
}

859 860
// startFlushLoop starts a goroutine to handle post func process
// which is to notify `RootCoord` that this segment is flushed
S
sunby 已提交
861
func (s *Server) startFlushLoop(ctx context.Context) {
862 863 864 865 866 867 868 869 870 871
	go func() {
		defer logutil.LogPanic()
		defer s.serverLoopWg.Done()
		ctx2, cancel := context.WithCancel(ctx)
		defer cancel()
		// send `Flushing` segments
		go s.handleFlushingSegments(ctx2)
		for {
			select {
			case <-ctx.Done():
X
Xiaofan 已提交
872
				logutil.Logger(s.ctx).Info("flush loop shutdown")
873 874 875
				return
			case segmentID := <-s.flushCh:
				//Ignore return error
X
Xiaofan 已提交
876 877 878 879 880
				log.Info("flush successfully", zap.Any("segmentID", segmentID))
				err := s.postFlush(ctx, segmentID)
				if err != nil {
					log.Warn("failed to do post flush", zap.Any("segmentID", segmentID), zap.Error(err))
				}
881
			}
S
sunby 已提交
882
		}
883
	}()
S
sunby 已提交
884 885
}

886 887 888 889 890
// post function after flush is done
// 1. check segment id is valid
// 2. notify RootCoord segment is flushed
// 3. change segment state to `Flushed` in meta
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
891
	segment := s.meta.GetHealthySegment(segmentID)
892
	if segment == nil {
S
smellthemoon 已提交
893
		return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
894 895
	}
	// set segment to SegmentState_Flushed
896
	if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
897 898 899
		log.Error("flush segment complete failed", zap.Error(err))
		return err
	}
900
	s.buildIndexCh <- segmentID
X
Xiaofan 已提交
901
	log.Info("flush segment complete", zap.Int64("id", segmentID))
902 903 904 905
	return nil
}

// recovery logic, fetch all Segment in `Flushing` state and do Flush notification logic
S
sunby 已提交
906 907 908 909 910 911 912 913 914 915 916
func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

917
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
918
	var err error
W
wayblink 已提交
919 920 921 922
	if s.rootCoordClient == nil {
		if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.EtcdCfg.MetaRootPath.GetValue(), s.etcdCli); err != nil {
			return err
		}
S
sunby 已提交
923
	}
924
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
925 926
		return err
	}
927
	return s.rootCoordClient.Start()
S
sunby 已提交
928
}
929

930 931 932
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
933
//
934
//	stop message stream client and stop server loops
S
sunby 已提交
935
func (s *Server) Stop() error {
936
	if !s.stateCode.CompareAndSwap(commonpb.StateCode_Healthy, commonpb.StateCode_Abnormal) {
S
sunby 已提交
937 938
		return nil
	}
X
Xiaofan 已提交
939
	logutil.Logger(s.ctx).Info("server shutdown")
S
sunby 已提交
940
	s.cluster.Close()
941
	s.garbageCollector.close()
S
sunby 已提交
942
	s.stopServerLoop()
S
sunby 已提交
943

944
	if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
S
sunby 已提交
945 946 947
		s.stopCompactionTrigger()
		s.stopCompactionHandler()
	}
948
	s.indexBuilder.Stop()
949 950 951 952 953

	if s.session != nil {
		s.session.Stop()
	}

S
sunby 已提交
954 955 956
	return nil
}

S
sunby 已提交
957 958
// CleanMeta only for test
func (s *Server) CleanMeta() error {
959
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
960
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
961 962
}

S
sunby 已提交
963 964 965 966 967
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

968 969 970 971 972 973 974 975 976 977 978 979 980 981
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
982

983 984
// loadCollectionFromRootCoord communicates with RootCoord and asks for collection information.
// collection information will be added to server meta info.
985
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
986
	resp, err := s.rootCoordClient.DescribeCollectionInternal(ctx, &milvuspb.DescribeCollectionRequest{
S
smellthemoon 已提交
987 988
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
E
Enwei Jiao 已提交
989
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
990
		),
S
sunby 已提交
991 992 993 994 995 996
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
997
	presp, err := s.rootCoordClient.ShowPartitionsInternal(ctx, &milvuspb.ShowPartitionsRequest{
S
smellthemoon 已提交
998 999 1000
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_ShowPartitions),
			commonpbutil.WithMsgID(0),
E
Enwei Jiao 已提交
1001
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
S
smellthemoon 已提交
1002
		),
S
sunby 已提交
1003 1004 1005 1006 1007
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
1008 1009
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
1010 1011
		return err
	}
J
jaime 已提交
1012 1013 1014 1015 1016 1017 1018

	properties := make(map[string]string)
	for _, pair := range resp.Properties {
		properties[pair.GetKey()] = pair.GetValue()
	}

	collInfo := &collectionInfo{
1019 1020 1021 1022
		ID:             resp.CollectionID,
		Schema:         resp.Schema,
		Partitions:     presp.PartitionIDs,
		StartPositions: resp.GetStartPositions(),
J
jaime 已提交
1023
		Properties:     properties,
1024
		CreatedAt:      resp.GetCreatedTimestamp(),
S
sunby 已提交
1025
	}
S
sunby 已提交
1026 1027
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
1028
}
1029

1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048
// hasCollection communicates with RootCoord and check whether this collection exist from the user's perspective.
func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, error) {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
		Base: commonpbutil.NewMsgBase(
			commonpbutil.WithMsgType(commonpb.MsgType_DescribeCollection),
			commonpbutil.WithSourceID(paramtable.GetNodeID()),
		),
		DbName:       "",
		CollectionID: collectionID,
	})
	if err != nil {
		return false, err
	}
	if resp == nil {
		return false, errNilResponse
	}
	if resp.Status.ErrorCode == commonpb.ErrorCode_Success {
		return true, nil
	}
1049 1050
	statusErr := common.NewStatusError(resp.Status.ErrorCode, resp.Status.Reason)
	if common.IsCollectionNotExistError(statusErr) {
1051 1052
		return false, nil
	}
1053
	return false, statusErr
1054 1055
}

1056 1057 1058 1059 1060
func (s *Server) reCollectSegmentStats(ctx context.Context) {
	if s.channelManager == nil {
		log.Error("null channel manager found, which should NOT happen in non-testing environment")
		return
	}
1061
	nodes := s.sessionManager.getLiveNodeIDs()
1062 1063
	log.Info("re-collecting segment stats from DataNodes",
		zap.Int64s("DataNode IDs", nodes))
X
Xiaofan 已提交
1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074

	reCollectFunc := func() error {
		err := s.cluster.ReCollectSegmentStats(ctx)
		if err != nil {
			return err
		}
		return nil
	}

	if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil {
		panic(err)
1075 1076
	}
}