server.go 16.7 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13 14
import (
	"context"
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
37 38
)

S
sunby 已提交
39 40 41 42 43
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
44

S
sunby 已提交
45
type (
S
sunby 已提交
46 47
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
48
)
S
sunby 已提交
49

50 51 52 53 54 55 56 57 58 59 60 61
// ServerState type alias
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

G
godchen 已提交
62 63
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
64

65 66
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
67
type Server struct {
S
sunby 已提交
68 69 70 71
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
72
	isServing        ServerState
73 74 75 76

	kvClient          *etcdkv.EtcdKV
	meta              *meta
	segmentInfoStream msgstream.MsgStream
77
	segmentManager    Manager
S
sunby 已提交
78
	allocator         allocator
79
	cluster           *cluster
80
	rootCoordClient   types.RootCoord
S
sunby 已提交
81
	ddChannelName     string
82

83 84
	flushCh   chan UniqueID
	msFactory msgstream.Factory
85 86 87

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
88
	eventCh  <-chan *sessionutil.SessionEvent
89

S
sunby 已提交
90 91
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
92
}
S
sunby 已提交
93

94
// CreateServer create `Server` instance
G
groot 已提交
95
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
96
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
97
	s := &Server{
S
sunby 已提交
98 99 100 101 102
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
103 104 105 106
	}
	return s, nil
}

G
godchen 已提交
107 108
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
109 110
}

G
godchen 已提交
111 112
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
113 114
}

115 116
// Register register data service at etcd
func (s *Server) Register() error {
117
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
118
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
119 120 121 122
	Params.NodeID = s.session.ServerID
	return nil
}

123
// Init change server state to Initializing
124
func (s *Server) Init() error {
125
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
126 127 128
	return nil
}

129 130 131 132 133 134 135
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
136
func (s *Server) Start() error {
137
	var err error
S
sunby 已提交
138 139 140 141 142 143 144 145
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
146
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
147 148
		return err
	}
149

S
sunby 已提交
150 151 152
	if err = s.initMeta(); err != nil {
		return err
	}
153

S
sunby 已提交
154 155 156
	if err = s.initCluster(); err != nil {
		return err
	}
157

S
sunby 已提交
158 159 160
	if err = s.initSegmentInfoChannel(); err != nil {
		return err
	}
161

S
sunby 已提交
162
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
163

164
	s.startSegmentManager()
S
sunby 已提交
165 166 167
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
168

S
sunby 已提交
169
	s.startServerLoop()
170

171
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
S
sunby 已提交
172
	log.Debug("DataCoordinator startup success")
S
sunby 已提交
173
	return nil
174 175 176 177
}

func (s *Server) initCluster() error {
	dManager, err := newClusterNodeManager(s.kvClient)
G
groot 已提交
178 179 180
	if err != nil {
		return err
	}
S
sunby 已提交
181
	sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
182
	s.cluster = newCluster(s.ctx, dManager, sManager, s)
183 184
	return nil
}
G
groot 已提交
185

186 187 188
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
189
		log.Debug("DataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
190 191
		return err
	}
192 193 194 195 196 197 198 199 200 201
	log.Debug("registered sessions", zap.Any("sessions", sessions))

	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
G
godchen 已提交
202

203
	if err := s.cluster.startup(datanodes); err != nil {
204
		log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
S
sunby 已提交
205 206
		return err
	}
207

208
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
209 210 211
	return nil
}

212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
	if s.session == nil {
		log.Warn("load data nodes but session is nil")
		return []*datapb.DataNodeInfo{}
	}
	sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
		log.Warn("load data nodes faild", zap.Error(err))
		return []*datapb.DataNodeInfo{}
	}
	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
	return datanodes
}

233
func (s *Server) startSegmentManager() {
234
	helper := createNewSegmentHelper(s.segmentInfoStream)
235
	s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
236 237
}

238 239 240 241 242 243 244
func (s *Server) initSegmentInfoChannel() error {
	var err error
	s.segmentInfoStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
245
	log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName)
246 247
	s.segmentInfoStream.Start()
	return nil
248 249
}

S
sunby 已提交
250
func (s *Server) initMeta() error {
251
	connectEtcdFn := func() error {
252
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
253 254 255
		if err != nil {
			return err
		}
256 257
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
258 259 260 261
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
262
	}
G
godchen 已提交
263
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
264 265
}

266 267
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
268
	s.serverLoopWg.Add(5)
269
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
270
	go s.startDataNodeTtLoop(s.serverLoopCtx)
271 272
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
273
	go s.startFlushLoop(s.serverLoopCtx)
274 275 276
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
277
	defer logutil.LogPanic()
278
	defer s.serverLoopWg.Done()
G
groot 已提交
279
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
280 281
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
	log.Debug("DataCoord stats stream",
282
		zap.String("channelName", Params.StatisticsChannelName),
283
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
284 285 286 287 288
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
289
			log.Debug("stats channel shutdown")
290 291 292
			return
		default:
		}
293
		msgPack := statsStream.Consume()
S
sunby 已提交
294
		if msgPack == nil {
S
sunby 已提交
295
			return
S
sunby 已提交
296
		}
297
		for _, msg := range msgPack.Msgs {
298
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
299 300
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
301
				continue
S
sunby 已提交
302
			}
303
			log.Debug("Receive DataNode segment statistics update")
304 305
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
306
				s.segmentManager.UpdateSegmentStats(stat)
307
			}
308 309 310 311
		}
	}
}

S
sunby 已提交
312
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
313
	defer logutil.LogPanic()
314
	defer s.serverLoopWg.Done()
S
sunby 已提交
315 316 317 318 319 320
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
321 322 323
		Params.DataCoordSubscriptionName)
	log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
		Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
S
sunby 已提交
324 325
	ttMsgStream.Start()
	defer ttMsgStream.Close()
326 327 328
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
329
			log.Debug("data node tt loop shutdown")
330 331 332
			return
		default:
		}
S
sunby 已提交
333
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
334
		if msgPack == nil {
S
sunby 已提交
335
			return
S
sunby 已提交
336
		}
337
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
338
			if msg.Type() != commonpb.MsgType_DataNodeTt {
S
sunby 已提交
339
				log.Warn("Receive unexpected msg type from tt channel",
S
sunby 已提交
340
					zap.Stringer("msgType", msg.Type()))
341 342
				continue
			}
S
sunby 已提交
343 344 345 346
			ttMsg := msg.(*msgstream.DataNodeTtMsg)

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
347 348
			// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
			// zap.Any("ts", ts))
349
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
350
			if err != nil {
S
sunby 已提交
351
				log.Warn("get flushable segments failed", zap.Error(err))
352 353
				continue
			}
354

S
sunby 已提交
355 356 357 358
			if len(segments) == 0 {
				continue
			}
			log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
359
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
360 361
			for _, id := range segments {
				sInfo, err := s.meta.GetSegment(id)
362
				if err != nil {
S
sunby 已提交
363
					log.Error("get segment from meta error", zap.Int64("id", id),
364
						zap.Error(err))
S
sunby 已提交
365
					continue
366
				}
367 368
				segmentInfos = append(segmentInfos, sInfo)
			}
S
sunby 已提交
369 370 371
			if len(segmentInfos) > 0 {
				s.cluster.flush(segmentInfos)
			}
372
			s.segmentManager.ExpireAllocations(ch, ts)
373 374 375 376 377
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
378
	defer logutil.LogPanic()
379 380 381 382 383 384
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
385
		case event := <-s.eventCh:
386 387 388 389
			datanode := &datapb.DataNodeInfo{
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
390
			}
391 392
			switch event.EventType {
			case sessionutil.SessionAddEvent:
S
sunby 已提交
393 394 395
				log.Info("Received datanode register",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
396 397
				//s.cluster.register(datanode)
				s.cluster.refresh(s.loadDataNodes())
398
			case sessionutil.SessionDelEvent:
S
sunby 已提交
399 400 401
				log.Info("Received datanode unregister",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
402 403
				//s.cluster.unregister(datanode)
				s.cluster.refresh(s.loadDataNodes())
404 405 406 407 408 409 410
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
411

412
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
413
	defer logutil.LogPanic()
414 415 416 417 418 419 420
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
421
			}
422
			s.Stop()
S
sunby 已提交
423
			log.Debug("disconnect with etcd and shutdown data coordinator")
424 425 426 427
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
428 429 430 431
		}
	}
}

S
sunby 已提交
432 433 434 435 436 437 438 439 440 441 442 443 444
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
445
			segment, err := s.meta.GetSegment(segmentID)
S
sunby 已提交
446
			if err != nil {
447 448 449 450 451 452 453 454 455 456 457 458
				log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
				continue
			}
			req := &datapb.SegmentFlushCompletedMsg{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_SegmentFlushDone,
				},
				Segment: segment,
			}
			resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
			if err = VerifyResponse(resp, err); err != nil {
				log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
S
sunby 已提交
459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481
				continue
			}
			// set segment to SegmentState_Flushed
			if err = s.meta.FlushSegment(segmentID); err != nil {
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

482
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
483
	var err error
G
godchen 已提交
484
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
485 486
		return err
	}
487
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
488 489
		return err
	}
490
	return s.rootCoordClient.Start()
S
sunby 已提交
491
}
492

493 494 495 496
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
497
func (s *Server) Stop() error {
498
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
499 500
		return nil
	}
501
	log.Debug("DataCoord server shutdown")
502
	atomic.StoreInt64(&s.isServing, ServerStateStopped)
S
sunby 已提交
503 504 505
	s.cluster.releaseSessions()
	s.segmentInfoStream.Close()
	s.stopServerLoop()
S
sunby 已提交
506 507 508
	return nil
}

S
sunby 已提交
509 510
// CleanMeta only for test
func (s *Server) CleanMeta() error {
511
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
512
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
513 514
}

S
sunby 已提交
515 516 517 518 519
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

520 521 522 523 524 525 526 527 528 529 530 531 532 533
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
534

535 536
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
537
		Base: &commonpb.MsgBase{
538
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
539 540 541 542 543 544 545 546
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
547
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
548 549
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
550 551
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
552
			SourceID:  Params.NodeID,
553
		},
S
sunby 已提交
554 555 556 557 558
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
559 560
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
561 562
		return err
	}
S
sunby 已提交
563 564 565 566 567 568
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
	return s.meta.AddCollection(collInfo)
S
sunby 已提交
569 570
}

571
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
572
	meta := make(map[string]string)
573

574 575 576 577 578 579 580 581
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
582
	}
583

S
sunby 已提交
584
	return meta, nil
585
}