server.go 16.6 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13
import (
	"context"
14
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
37 38
)

S
sunby 已提交
39 40 41 42 43
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
44

45 46 47 48 49 50 51 52
var (
	// TODO: sunby put to config
	enableTtChecker  = true
	ttCheckerName    = "dataTtChecker"
	ttMaxInterval    = 3 * time.Minute
	ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
)

S
sunby 已提交
53
type (
S
sunby 已提交
54 55
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
56
)
S
sunby 已提交
57

58 59 60 61 62 63 64 65 66 67 68 69
// ServerState type alias
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

G
godchen 已提交
70 71
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
72

73 74
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
75
type Server struct {
S
sunby 已提交
76 77 78 79
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
80
	isServing        ServerState
81

82 83 84 85
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
86
	cluster         *Cluster
87 88
	rootCoordClient types.RootCoord
	ddChannelName   string
89

90 91
	flushCh   chan UniqueID
	msFactory msgstream.Factory
92 93 94

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
95
	eventCh  <-chan *sessionutil.SessionEvent
96

S
sunby 已提交
97 98
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
99
}
S
sunby 已提交
100

S
sunby 已提交
101 102 103 104 105 106 107 108
type Option func(svr *Server)

func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

109
// CreateServer create `Server` instance
S
sunby 已提交
110
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
111
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
112
	s := &Server{
S
sunby 已提交
113 114 115 116 117
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
118
	}
S
sunby 已提交
119 120 121 122

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
123 124 125
	return s, nil
}

G
godchen 已提交
126 127
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
128 129
}

G
godchen 已提交
130 131
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
132 133
}

134 135
// Register register data service at etcd
func (s *Server) Register() error {
136
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
137
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
138 139 140 141
	Params.NodeID = s.session.ServerID
	return nil
}

142
// Init change server state to Initializing
143
func (s *Server) Init() error {
144
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
145 146 147
	return nil
}

148 149 150 151 152 153 154
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
155
func (s *Server) Start() error {
156
	var err error
S
sunby 已提交
157 158 159 160 161 162 163 164
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
165
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
166 167
		return err
	}
168

S
sunby 已提交
169 170 171
	if err = s.initMeta(); err != nil {
		return err
	}
172

S
sunby 已提交
173 174 175
	if err = s.initCluster(); err != nil {
		return err
	}
176

S
sunby 已提交
177
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
178

179
	s.startSegmentManager()
S
sunby 已提交
180 181 182
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
183

S
sunby 已提交
184
	s.startServerLoop()
185

186
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
187
	log.Debug("dataCoordinator startup success")
S
sunby 已提交
188
	return nil
189 190 191
}

func (s *Server) initCluster() error {
S
sunby 已提交
192 193 194
	var err error
	s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
	return err
195
}
G
groot 已提交
196

197 198 199
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
200
		log.Debug("dataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
201 202
		return err
	}
203 204
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
205
	datanodes := make([]*NodeInfo, 0, len(sessions))
206
	for _, session := range sessions {
S
sunby 已提交
207
		info := &datapb.DataNodeInfo{
208 209 210
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
211 212 213
		}
		nodeInfo := NewNodeInfo(s.ctx, info)
		datanodes = append(datanodes, nodeInfo)
214
	}
G
godchen 已提交
215

S
sunby 已提交
216
	s.cluster.Startup(datanodes)
217

218
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
219 220 221
	return nil
}

222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
	if s.session == nil {
		log.Warn("load data nodes but session is nil")
		return []*datapb.DataNodeInfo{}
	}
	sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
		log.Warn("load data nodes faild", zap.Error(err))
		return []*datapb.DataNodeInfo{}
	}
	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
	return datanodes
}

243
func (s *Server) startSegmentManager() {
244
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
245 246
}

S
sunby 已提交
247
func (s *Server) initMeta() error {
248
	connectEtcdFn := func() error {
249
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
250 251 252
		if err != nil {
			return err
		}
253 254
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
255 256 257 258
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
259
	}
G
godchen 已提交
260
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
261 262
}

263 264
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
265
	s.serverLoopWg.Add(5)
266
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
267
	go s.startDataNodeTtLoop(s.serverLoopCtx)
268 269
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
270
	go s.startFlushLoop(s.serverLoopCtx)
271 272 273
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
274
	defer logutil.LogPanic()
275
	defer s.serverLoopWg.Done()
G
groot 已提交
276
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
277
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
278
	log.Debug("dataCoord create stats channel consumer",
279
		zap.String("channelName", Params.StatisticsChannelName),
280
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
281 282 283 284 285
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
286
			log.Debug("stats channel shutdown")
287 288 289
			return
		default:
		}
290
		msgPack := statsStream.Consume()
S
sunby 已提交
291
		if msgPack == nil {
S
sunby 已提交
292
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
293
			return
S
sunby 已提交
294
		}
295
		for _, msg := range msgPack.Msgs {
296
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
297 298
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
299
				continue
S
sunby 已提交
300
			}
301 302
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
303
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
304
			}
305 306 307 308
		}
	}
}

S
sunby 已提交
309
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
310
	defer logutil.LogPanic()
311
	defer s.serverLoopWg.Done()
S
sunby 已提交
312 313 314 315 316 317
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
318
		Params.DataCoordSubscriptionName)
319 320 321
	log.Debug("dataCoord create time tick channel consumer",
		zap.String("timeTickChannelName", Params.TimeTickChannelName),
		zap.String("subscriptionName", Params.DataCoordSubscriptionName))
S
sunby 已提交
322 323
	ttMsgStream.Start()
	defer ttMsgStream.Close()
324 325 326 327 328 329

	var checker *LongTermChecker
	if enableTtChecker {
		checker = NewLongTermChecker(ctx, ttCheckerName, ttMaxInterval, ttCheckerWarnMsg)
		checker.Start()
	}
330 331 332
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
333
			log.Debug("data node tt loop shutdown")
334 335 336
			return
		default:
		}
S
sunby 已提交
337
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
338
		if msgPack == nil {
S
sunby 已提交
339
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
340
			return
S
sunby 已提交
341
		}
342
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
343
			if msg.Type() != commonpb.MsgType_DataNodeTt {
344
				log.Warn("receive unexpected msg type from tt channel",
S
sunby 已提交
345
					zap.Stringer("msgType", msg.Type()))
346 347
				continue
			}
S
sunby 已提交
348
			ttMsg := msg.(*msgstream.DataNodeTtMsg)
349 350 351
			if enableTtChecker {
				checker.Check()
			}
S
sunby 已提交
352 353 354

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
355
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
356
			if err != nil {
S
sunby 已提交
357
				log.Warn("get flushable segments failed", zap.Error(err))
358 359
				continue
			}
360

S
sunby 已提交
361 362 363
			if len(segments) == 0 {
				continue
			}
364
			log.Debug("flush segments", zap.Int64s("segmentIDs", segments))
365
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
366
			for _, id := range segments {
S
sunby 已提交
367 368
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
369
					log.Error("get segment from meta error", zap.Int64("id", id),
370
						zap.Error(err))
S
sunby 已提交
371
					continue
372
				}
S
sunby 已提交
373
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
374
			}
S
sunby 已提交
375
			if len(segmentInfos) > 0 {
S
sunby 已提交
376
				s.cluster.Flush(segmentInfos)
S
sunby 已提交
377
			}
378
			s.segmentManager.ExpireAllocations(ch, ts)
379 380 381 382 383
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
384
	defer logutil.LogPanic()
385 386 387 388 389 390
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
391
		case event := <-s.eventCh:
S
sunby 已提交
392
			info := &datapb.DataNodeInfo{
393 394 395
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
396
			}
S
sunby 已提交
397
			node := NewNodeInfo(ctx, info)
398 399
			switch event.EventType {
			case sessionutil.SessionAddEvent:
400
				log.Info("received datanode register",
S
sunby 已提交
401 402 403
					zap.String("address", info.Address),
					zap.Int64("serverID", info.Version))
				s.cluster.Register(node)
404
			case sessionutil.SessionDelEvent:
405
				log.Info("received datanode unregister",
S
sunby 已提交
406 407 408
					zap.String("address", info.Address),
					zap.Int64("serverID", info.Version))
				s.cluster.UnRegister(node)
409 410 411 412 413 414 415
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
416

417
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
418
	defer logutil.LogPanic()
419 420 421 422 423 424 425
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
426
			}
427
			go func() { s.Stop() }()
S
sunby 已提交
428
			log.Debug("disconnect with etcd and shutdown data coordinator")
429 430 431 432
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
433 434 435 436
		}
	}
}

S
sunby 已提交
437 438 439 440 441 442 443 444 445 446 447 448 449
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
S
sunby 已提交
450 451
			segment := s.meta.GetSegment(segmentID)
			if segment == nil {
452 453 454 455 456 457 458
				log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
				continue
			}
			req := &datapb.SegmentFlushCompletedMsg{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_SegmentFlushDone,
				},
S
sunby 已提交
459
				Segment: segment.SegmentInfo,
460 461 462 463
			}
			resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
			if err = VerifyResponse(resp, err); err != nil {
				log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
S
sunby 已提交
464 465 466
				continue
			}
			// set segment to SegmentState_Flushed
S
sunby 已提交
467
			if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
S
sunby 已提交
468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

487
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
488
	var err error
G
godchen 已提交
489
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
490 491
		return err
	}
492
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
493 494
		return err
	}
495
	return s.rootCoordClient.Start()
S
sunby 已提交
496
}
497

498 499 500 501
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
502
func (s *Server) Stop() error {
503
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
504 505
		return nil
	}
506
	log.Debug("dataCoord server shutdown")
S
sunby 已提交
507
	s.cluster.Close()
S
sunby 已提交
508
	s.stopServerLoop()
S
sunby 已提交
509 510 511
	return nil
}

S
sunby 已提交
512 513
// CleanMeta only for test
func (s *Server) CleanMeta() error {
514
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
515
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
516 517
}

S
sunby 已提交
518 519 520 521 522
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

523 524 525 526 527 528 529 530 531 532 533 534 535 536
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
537

538 539
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
540
		Base: &commonpb.MsgBase{
541
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
542 543 544 545 546 547 548 549
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
550
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
551 552
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
553 554
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
555
			SourceID:  Params.NodeID,
556
		},
S
sunby 已提交
557 558 559 560 561
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
562 563
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
564 565
		return err
	}
S
sunby 已提交
566 567 568 569 570
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
S
sunby 已提交
571 572
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
573 574
}

575
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
576
	meta := make(map[string]string)
577

578 579 580 581 582 583 584 585
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
586
	}
587

S
sunby 已提交
588
	return meta, nil
589
}