server.go 15.6 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13 14
import (
	"context"
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
38 39
)

S
sunby 已提交
40 41 42 43 44
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
45

S
sunby 已提交
46
type (
S
sunby 已提交
47 48
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
49
)
S
sunby 已提交
50 51 52 53

type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context) (types.RootCoord, error)

N
neza2017 已提交
54
type Server struct {
S
sunby 已提交
55 56 57 58
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
S
sunby 已提交
59
	isServing        int64
60 61 62 63

	kvClient          *etcdkv.EtcdKV
	meta              *meta
	segmentInfoStream msgstream.MsgStream
64
	segmentManager    Manager
S
sunby 已提交
65
	allocator         allocator
66
	cluster           *cluster
67
	rootCoordClient   types.RootCoord
S
sunby 已提交
68
	ddChannelName     string
69

S
sunby 已提交
70
	flushCh        chan UniqueID
71 72 73 74 75
	flushMsgStream msgstream.MsgStream
	msFactory      msgstream.Factory

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
76
	eventCh  <-chan *sessionutil.SessionEvent
77

S
sunby 已提交
78 79
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
80
}
S
sunby 已提交
81

G
groot 已提交
82
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
83
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
84
	s := &Server{
S
sunby 已提交
85 86 87 88 89
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
90 91 92 93
	}
	return s, nil
}

S
sunby 已提交
94 95 96 97 98 99 100 101
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr, 3*time.Second)
}

func defaultRootCoordCreatorFunc(ctx context.Context) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, Params.MetaRootPath, Params.EtcdEndpoints, rootCoordClientTimout)
}

102 103
// Register register data service at etcd
func (s *Server) Register() error {
104
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
105
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
106 107 108 109 110
	Params.NodeID = s.session.ServerID
	return nil
}

func (s *Server) Init() error {
S
sunby 已提交
111
	atomic.StoreInt64(&s.isServing, 1)
S
sunby 已提交
112 113 114 115
	return nil
}

func (s *Server) Start() error {
116
	var err error
S
sunby 已提交
117 118 119 120 121 122 123 124
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
125
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
126 127
		return err
	}
128

S
sunby 已提交
129 130 131
	if err = s.initMeta(); err != nil {
		return err
	}
132

S
sunby 已提交
133 134 135
	if err = s.initCluster(); err != nil {
		return err
	}
136

S
sunby 已提交
137 138 139
	if err = s.initSegmentInfoChannel(); err != nil {
		return err
	}
140

S
sunby 已提交
141
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
142

143
	s.startSegmentManager()
S
sunby 已提交
144 145 146
	if err = s.initFlushMsgStream(); err != nil {
		return err
	}
147

S
sunby 已提交
148 149 150
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
151

S
sunby 已提交
152
	s.startServerLoop()
153

S
sunby 已提交
154
	atomic.StoreInt64(&s.isServing, 2)
S
sunby 已提交
155
	log.Debug("DataCoordinator startup success")
S
sunby 已提交
156
	return nil
157 158 159 160
}

func (s *Server) initCluster() error {
	dManager, err := newClusterNodeManager(s.kvClient)
G
groot 已提交
161 162 163
	if err != nil {
		return err
	}
S
sunby 已提交
164
	sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
165
	s.cluster = newCluster(s.ctx, dManager, sManager, s)
166 167
	return nil
}
G
groot 已提交
168

169 170 171
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
172
		log.Debug("DataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
173 174
		return err
	}
175 176 177 178 179 180 181 182 183 184
	log.Debug("registered sessions", zap.Any("sessions", sessions))

	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
G
godchen 已提交
185

186
	if err := s.cluster.startup(datanodes); err != nil {
187
		log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
S
sunby 已提交
188 189
		return err
	}
190

S
sunby 已提交
191
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev)
S
sunby 已提交
192 193 194
	return nil
}

195
func (s *Server) startSegmentManager() {
196
	helper := createNewSegmentHelper(s.segmentInfoStream)
197
	s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
198 199
}

200 201 202 203 204 205 206
func (s *Server) initSegmentInfoChannel() error {
	var err error
	s.segmentInfoStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
207
	log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName)
208 209
	s.segmentInfoStream.Start()
	return nil
210 211
}

S
sunby 已提交
212
func (s *Server) initMeta() error {
213
	connectEtcdFn := func() error {
214
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
215 216 217
		if err != nil {
			return err
		}
218 219
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
220 221 222 223
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
224
	}
S
sunby 已提交
225
	return retry.Retry(connEtcdMaxRetryTime, connEtcdRetryInterval, connectEtcdFn)
S
sunby 已提交
226 227
}

228
func (s *Server) initFlushMsgStream() error {
C
cai.zhang 已提交
229
	var err error
230 231 232 233 234 235
	// segment flush stream
	s.flushMsgStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
236
	log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName)
237
	s.flushMsgStream.Start()
S
sunby 已提交
238 239
	return nil
}
S
sunby 已提交
240

241 242
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
243
	s.serverLoopWg.Add(5)
244
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
245
	go s.startDataNodeTtLoop(s.serverLoopCtx)
246 247
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
248
	go s.startFlushLoop(s.serverLoopCtx)
249 250 251
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
252
	defer logutil.LogPanic()
253
	defer s.serverLoopWg.Done()
G
groot 已提交
254
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
255 256
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
	log.Debug("DataCoord stats stream",
257
		zap.String("channelName", Params.StatisticsChannelName),
258
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
259 260 261 262 263
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
264
			log.Debug("stats channel shutdown")
265 266 267
			return
		default:
		}
268
		msgPack := statsStream.Consume()
S
sunby 已提交
269
		if msgPack == nil {
S
sunby 已提交
270
			return
S
sunby 已提交
271
		}
272
		for _, msg := range msgPack.Msgs {
273
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
274 275
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
276
				continue
S
sunby 已提交
277
			}
278
			log.Debug("Receive DataNode segment statistics update")
279 280
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
281
				s.segmentManager.UpdateSegmentStats(stat)
282
			}
283 284 285 286
		}
	}
}

S
sunby 已提交
287
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
288
	defer logutil.LogPanic()
289
	defer s.serverLoopWg.Done()
S
sunby 已提交
290 291 292 293 294 295
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
296 297 298
		Params.DataCoordSubscriptionName)
	log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
		Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
S
sunby 已提交
299 300
	ttMsgStream.Start()
	defer ttMsgStream.Close()
301 302 303
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
304
			log.Debug("data node tt loop shutdown")
305 306 307
			return
		default:
		}
S
sunby 已提交
308
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
309
		if msgPack == nil {
S
sunby 已提交
310
			return
S
sunby 已提交
311
		}
312
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
313
			if msg.Type() != commonpb.MsgType_DataNodeTt {
S
sunby 已提交
314
				log.Warn("Receive unexpected msg type from tt channel",
S
sunby 已提交
315
					zap.Stringer("msgType", msg.Type()))
316 317
				continue
			}
S
sunby 已提交
318 319 320 321
			ttMsg := msg.(*msgstream.DataNodeTtMsg)

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
S
sunby 已提交
322 323
			log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
				zap.Any("ts", ts))
324
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
325
			if err != nil {
S
sunby 已提交
326
				log.Warn("get flushable segments failed", zap.Error(err))
327 328
				continue
			}
329

S
sunby 已提交
330 331 332 333
			if len(segments) == 0 {
				continue
			}
			log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
334
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
335 336
			for _, id := range segments {
				sInfo, err := s.meta.GetSegment(id)
337
				if err != nil {
S
sunby 已提交
338
					log.Error("get segment from meta error", zap.Int64("id", id),
339
						zap.Error(err))
S
sunby 已提交
340
					continue
341
				}
342 343
				segmentInfos = append(segmentInfos, sInfo)
			}
S
sunby 已提交
344 345 346
			if len(segmentInfos) > 0 {
				s.cluster.flush(segmentInfos)
			}
347 348 349 350 351
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
352
	defer logutil.LogPanic()
353 354 355 356 357 358
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
359
		case event := <-s.eventCh:
360 361 362 363
			datanode := &datapb.DataNodeInfo{
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
364
			}
365 366
			switch event.EventType {
			case sessionutil.SessionAddEvent:
S
sunby 已提交
367 368 369
				log.Info("Received datanode register",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
370 371
				s.cluster.register(datanode)
			case sessionutil.SessionDelEvent:
S
sunby 已提交
372 373 374
				log.Info("Received datanode unregister",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
375 376 377 378 379 380 381 382
				s.cluster.unregister(datanode)
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
383

384
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
385
	defer logutil.LogPanic()
386 387 388 389 390 391 392
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
393
			}
394
			s.Stop()
S
sunby 已提交
395
			log.Debug("disconnect with etcd and shutdown data coordinator")
396 397 398 399
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
400 401 402 403
		}
	}
}

S
sunby 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	var err error
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
			// write flush msg into segmentInfo/flush stream
			msgPack := composeSegmentFlushMsgPack(segmentID)
			err = s.flushMsgStream.Produce(&msgPack)
			if err != nil {
				log.Error("produce flush msg failed",
					zap.Int64("segmentID", segmentID),
					zap.Error(err))
				continue
			}
			log.Debug("send segment flush msg", zap.Int64("id", segmentID))

			// set segment to SegmentState_Flushed
			if err = s.meta.FlushSegment(segmentID); err != nil {
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

450
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
451
	var err error
S
sunby 已提交
452
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx); err != nil {
S
sunby 已提交
453 454
		return err
	}
455
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
456 457
		return err
	}
458
	return s.rootCoordClient.Start()
S
sunby 已提交
459
}
460

S
sunby 已提交
461
func (s *Server) Stop() error {
S
sunby 已提交
462
	if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) {
S
sunby 已提交
463 464
		return nil
	}
465
	log.Debug("DataCoord server shutdown")
S
sunby 已提交
466 467 468 469 470
	atomic.StoreInt64(&s.isServing, 0)
	s.cluster.releaseSessions()
	s.segmentInfoStream.Close()
	s.flushMsgStream.Close()
	s.stopServerLoop()
S
sunby 已提交
471 472 473
	return nil
}

S
sunby 已提交
474 475
// CleanMeta only for test
func (s *Server) CleanMeta() error {
476
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
477
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
478 479
}

S
sunby 已提交
480 481 482 483 484
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

485 486 487 488 489 490 491 492 493 494 495 496 497 498
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
499

500 501
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
502
		Base: &commonpb.MsgBase{
503
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
504 505 506 507 508 509 510 511
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
512
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
513 514 515 516 517
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
			MsgID:     -1, // todo
			Timestamp: 0,  // todo
			SourceID:  Params.NodeID,
518
		},
S
sunby 已提交
519 520 521 522 523
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
524 525
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
526 527
		return err
	}
S
sunby 已提交
528 529 530 531 532 533
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
	return s.meta.AddCollection(collInfo)
S
sunby 已提交
534 535
}

536
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
537
	meta := make(map[string]string)
538

539 540 541 542 543 544 545 546
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
547
	}
548

S
sunby 已提交
549
	return meta, nil
550
}
551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574

func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack {
	msgPack := msgstream.MsgPack{
		Msgs: make([]msgstream.TsMsg, 0, 1),
	}
	completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_SegmentFlushDone,
			MsgID:     0, // TODO
			Timestamp: 0, // TODO
			SourceID:  Params.NodeID,
		},
		SegmentID: segmentID,
	}
	var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
		SegmentFlushCompletedMsg: completeFlushMsg,
	}

	msgPack.Msgs = append(msgPack.Msgs, msg)
	return msgPack
}