server.go 17.7 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13 14
import (
	"context"
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
38 39
)

S
sunby 已提交
40 41 42 43 44
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
45

S
sunby 已提交
46
type (
S
sunby 已提交
47 48
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
49
)
S
sunby 已提交
50

51 52 53 54 55 56 57 58 59 60 61 62
// ServerState type alias
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

G
godchen 已提交
63 64
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
65

66 67
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
68
type Server struct {
S
sunby 已提交
69 70 71 72
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
73
	isServing        ServerState
74 75 76 77

	kvClient          *etcdkv.EtcdKV
	meta              *meta
	segmentInfoStream msgstream.MsgStream
78
	segmentManager    Manager
S
sunby 已提交
79
	allocator         allocator
80
	cluster           *cluster
81
	rootCoordClient   types.RootCoord
S
sunby 已提交
82
	ddChannelName     string
83

S
sunby 已提交
84
	flushCh        chan UniqueID
85 86 87 88 89
	flushMsgStream msgstream.MsgStream
	msFactory      msgstream.Factory

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
90
	eventCh  <-chan *sessionutil.SessionEvent
91

S
sunby 已提交
92 93
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
94
}
S
sunby 已提交
95

96
// CreateServer create `Server` instance
G
groot 已提交
97
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
98
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
99
	s := &Server{
S
sunby 已提交
100 101 102 103 104
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
105 106 107 108
	}
	return s, nil
}

G
godchen 已提交
109 110
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
111 112
}

G
godchen 已提交
113 114
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
115 116
}

117 118
// Register register data service at etcd
func (s *Server) Register() error {
119
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
120
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
121 122 123 124
	Params.NodeID = s.session.ServerID
	return nil
}

125
// Init change server state to Initializing
126
func (s *Server) Init() error {
127
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
128 129 130
	return nil
}

131 132 133 134 135 136 137
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
138
func (s *Server) Start() error {
139
	var err error
S
sunby 已提交
140 141 142 143 144 145 146 147
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
148
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
149 150
		return err
	}
151

S
sunby 已提交
152 153 154
	if err = s.initMeta(); err != nil {
		return err
	}
155

S
sunby 已提交
156 157 158
	if err = s.initCluster(); err != nil {
		return err
	}
159

S
sunby 已提交
160 161 162
	if err = s.initSegmentInfoChannel(); err != nil {
		return err
	}
163

S
sunby 已提交
164
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
165

166
	s.startSegmentManager()
S
sunby 已提交
167 168 169
	if err = s.initFlushMsgStream(); err != nil {
		return err
	}
170

S
sunby 已提交
171 172 173
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
174

S
sunby 已提交
175
	s.startServerLoop()
176

177
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
S
sunby 已提交
178
	log.Debug("DataCoordinator startup success")
S
sunby 已提交
179
	return nil
180 181 182 183
}

func (s *Server) initCluster() error {
	dManager, err := newClusterNodeManager(s.kvClient)
G
groot 已提交
184 185 186
	if err != nil {
		return err
	}
S
sunby 已提交
187
	sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
188
	s.cluster = newCluster(s.ctx, dManager, sManager, s)
189 190
	return nil
}
G
groot 已提交
191

192 193 194
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
195
		log.Debug("DataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
196 197
		return err
	}
198 199 200 201 202 203 204 205 206 207
	log.Debug("registered sessions", zap.Any("sessions", sessions))

	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
G
godchen 已提交
208

209
	if err := s.cluster.startup(datanodes); err != nil {
210
		log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
S
sunby 已提交
211 212
		return err
	}
213

S
sunby 已提交
214
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev)
S
sunby 已提交
215 216 217
	return nil
}

218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
	if s.session == nil {
		log.Warn("load data nodes but session is nil")
		return []*datapb.DataNodeInfo{}
	}
	sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
		log.Warn("load data nodes faild", zap.Error(err))
		return []*datapb.DataNodeInfo{}
	}
	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
	return datanodes
}

239
func (s *Server) startSegmentManager() {
240
	helper := createNewSegmentHelper(s.segmentInfoStream)
241
	s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
242 243
}

244 245 246 247 248 249 250
func (s *Server) initSegmentInfoChannel() error {
	var err error
	s.segmentInfoStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
251
	log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName)
252 253
	s.segmentInfoStream.Start()
	return nil
254 255
}

S
sunby 已提交
256
func (s *Server) initMeta() error {
257
	connectEtcdFn := func() error {
258
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
259 260 261
		if err != nil {
			return err
		}
262 263
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
264 265 266 267
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
268
	}
G
godchen 已提交
269
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
270 271
}

272
func (s *Server) initFlushMsgStream() error {
C
cai.zhang 已提交
273
	var err error
274 275 276 277 278 279
	// segment flush stream
	s.flushMsgStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
280
	log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName)
281
	s.flushMsgStream.Start()
S
sunby 已提交
282 283
	return nil
}
S
sunby 已提交
284

285 286
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
287
	s.serverLoopWg.Add(5)
288
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
289
	go s.startDataNodeTtLoop(s.serverLoopCtx)
290 291
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
292
	go s.startFlushLoop(s.serverLoopCtx)
293 294 295
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
296
	defer logutil.LogPanic()
297
	defer s.serverLoopWg.Done()
G
groot 已提交
298
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
299 300
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
	log.Debug("DataCoord stats stream",
301
		zap.String("channelName", Params.StatisticsChannelName),
302
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
303 304 305 306 307
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
308
			log.Debug("stats channel shutdown")
309 310 311
			return
		default:
		}
312
		msgPack := statsStream.Consume()
S
sunby 已提交
313
		if msgPack == nil {
S
sunby 已提交
314
			return
S
sunby 已提交
315
		}
316
		for _, msg := range msgPack.Msgs {
317
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
318 319
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
320
				continue
S
sunby 已提交
321
			}
322
			log.Debug("Receive DataNode segment statistics update")
323 324
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
325
				s.segmentManager.UpdateSegmentStats(stat)
326
			}
327 328 329 330
		}
	}
}

S
sunby 已提交
331
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
332
	defer logutil.LogPanic()
333
	defer s.serverLoopWg.Done()
S
sunby 已提交
334 335 336 337 338 339
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
340 341 342
		Params.DataCoordSubscriptionName)
	log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
		Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
S
sunby 已提交
343 344
	ttMsgStream.Start()
	defer ttMsgStream.Close()
345 346 347
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
348
			log.Debug("data node tt loop shutdown")
349 350 351
			return
		default:
		}
S
sunby 已提交
352
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
353
		if msgPack == nil {
S
sunby 已提交
354
			return
S
sunby 已提交
355
		}
356
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
357
			if msg.Type() != commonpb.MsgType_DataNodeTt {
S
sunby 已提交
358
				log.Warn("Receive unexpected msg type from tt channel",
S
sunby 已提交
359
					zap.Stringer("msgType", msg.Type()))
360 361
				continue
			}
S
sunby 已提交
362 363 364 365
			ttMsg := msg.(*msgstream.DataNodeTtMsg)

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
366 367
			// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
			// zap.Any("ts", ts))
368
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
369
			if err != nil {
S
sunby 已提交
370
				log.Warn("get flushable segments failed", zap.Error(err))
371 372
				continue
			}
373

S
sunby 已提交
374 375 376 377
			if len(segments) == 0 {
				continue
			}
			log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
378
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
379 380
			for _, id := range segments {
				sInfo, err := s.meta.GetSegment(id)
381
				if err != nil {
S
sunby 已提交
382
					log.Error("get segment from meta error", zap.Int64("id", id),
383
						zap.Error(err))
S
sunby 已提交
384
					continue
385
				}
386 387
				segmentInfos = append(segmentInfos, sInfo)
			}
S
sunby 已提交
388 389 390
			if len(segmentInfos) > 0 {
				s.cluster.flush(segmentInfos)
			}
391
			s.segmentManager.ExpireAllocations(ch, ts)
392 393 394 395 396
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
397
	defer logutil.LogPanic()
398 399 400 401 402 403
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
404
		case event := <-s.eventCh:
405 406 407 408
			datanode := &datapb.DataNodeInfo{
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
409
			}
410 411
			switch event.EventType {
			case sessionutil.SessionAddEvent:
S
sunby 已提交
412 413 414
				log.Info("Received datanode register",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
415 416
				//s.cluster.register(datanode)
				s.cluster.refresh(s.loadDataNodes())
417
			case sessionutil.SessionDelEvent:
S
sunby 已提交
418 419 420
				log.Info("Received datanode unregister",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
421 422
				//s.cluster.unregister(datanode)
				s.cluster.refresh(s.loadDataNodes())
423 424 425 426 427 428 429
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
430

431
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
432
	defer logutil.LogPanic()
433 434 435 436 437 438 439
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
440
			}
441
			s.Stop()
S
sunby 已提交
442
			log.Debug("disconnect with etcd and shutdown data coordinator")
443 444 445 446
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
447 448 449 450
		}
	}
}

S
sunby 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	var err error
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
			// write flush msg into segmentInfo/flush stream
			msgPack := composeSegmentFlushMsgPack(segmentID)
			err = s.flushMsgStream.Produce(&msgPack)
			if err != nil {
				log.Error("produce flush msg failed",
					zap.Int64("segmentID", segmentID),
					zap.Error(err))
				continue
			}
			log.Debug("send segment flush msg", zap.Int64("id", segmentID))

			// set segment to SegmentState_Flushed
			if err = s.meta.FlushSegment(segmentID); err != nil {
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

497
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
498
	var err error
G
godchen 已提交
499
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
500 501
		return err
	}
502
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
503 504
		return err
	}
505
	return s.rootCoordClient.Start()
S
sunby 已提交
506
}
507

508 509 510 511
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
512
func (s *Server) Stop() error {
513
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
514 515
		return nil
	}
516
	log.Debug("DataCoord server shutdown")
517
	atomic.StoreInt64(&s.isServing, ServerStateStopped)
S
sunby 已提交
518 519 520 521
	s.cluster.releaseSessions()
	s.segmentInfoStream.Close()
	s.flushMsgStream.Close()
	s.stopServerLoop()
S
sunby 已提交
522 523 524
	return nil
}

S
sunby 已提交
525 526
// CleanMeta only for test
func (s *Server) CleanMeta() error {
527
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
528
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
529 530
}

S
sunby 已提交
531 532 533 534 535
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

536 537 538 539 540 541 542 543 544 545 546 547 548 549
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
550

551 552
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
553
		Base: &commonpb.MsgBase{
554
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
555 556 557 558 559 560 561 562
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
563
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
564 565
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
566 567
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
568
			SourceID:  Params.NodeID,
569
		},
S
sunby 已提交
570 571 572 573 574
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
575 576
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
577 578
		return err
	}
S
sunby 已提交
579 580 581 582 583 584
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
	return s.meta.AddCollection(collInfo)
S
sunby 已提交
585 586
}

587
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
588
	meta := make(map[string]string)
589

590 591 592 593 594 595 596 597
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
598
	}
599

S
sunby 已提交
600
	return meta, nil
601
}
602 603 604 605 606 607 608 609

func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack {
	msgPack := msgstream.MsgPack{
		Msgs: make([]msgstream.TsMsg, 0, 1),
	}
	completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_SegmentFlushDone,
S
sunby 已提交
610 611
			MsgID:     0,
			Timestamp: 0,
612 613 614 615 616 617 618 619 620 621 622 623 624 625
			SourceID:  Params.NodeID,
		},
		SegmentID: segmentID,
	}
	var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
		SegmentFlushCompletedMsg: completeFlushMsg,
	}

	msgPack.Msgs = append(msgPack.Msgs, msg)
	return msgPack
}