server.go 16.2 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13 14
import (
	"context"
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
37 38
)

S
sunby 已提交
39 40 41 42 43
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
44

S
sunby 已提交
45
type (
S
sunby 已提交
46 47
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
48
)
S
sunby 已提交
49

50 51 52 53 54 55 56 57 58 59 60 61
// ServerState type alias
type ServerState = int64

const (
	// ServerStateStopped state stands for just created or stopped `Server` instance
	ServerStateStopped ServerState = 0
	// ServerStateInitializing state stands initializing `Server` instance
	ServerStateInitializing ServerState = 1
	// ServerStateHealthy state stands for healthy `Server` instance
	ServerStateHealthy ServerState = 2
)

G
godchen 已提交
62 63
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
64

65 66
// Server implements `types.Datacoord`
// handles Data Cooridinator related jobs
N
neza2017 已提交
67
type Server struct {
S
sunby 已提交
68 69 70 71
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
72
	isServing        ServerState
73

74 75 76 77
	kvClient        *etcdkv.EtcdKV
	meta            *meta
	segmentManager  Manager
	allocator       allocator
S
sunby 已提交
78
	cluster         *Cluster
79 80
	rootCoordClient types.RootCoord
	ddChannelName   string
81

82 83
	flushCh   chan UniqueID
	msFactory msgstream.Factory
84 85 86

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
87
	eventCh  <-chan *sessionutil.SessionEvent
88

S
sunby 已提交
89 90
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
91
}
S
sunby 已提交
92

S
sunby 已提交
93 94 95 96 97 98 99 100
type Option func(svr *Server)

func SetRootCoordCreator(creator rootCoordCreatorFunc) Option {
	return func(svr *Server) {
		svr.rootCoordClientCreator = creator
	}
}

101
// CreateServer create `Server` instance
S
sunby 已提交
102
func CreateServer(ctx context.Context, factory msgstream.Factory, opts ...Option) (*Server, error) {
S
sunby 已提交
103
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
104
	s := &Server{
S
sunby 已提交
105 106 107 108 109
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
110
	}
S
sunby 已提交
111 112 113 114

	for _, opt := range opts {
		opt(s)
	}
S
sunby 已提交
115 116 117
	return s, nil
}

G
godchen 已提交
118 119
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
120 121
}

G
godchen 已提交
122 123
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
124 125
}

126 127
// Register register data service at etcd
func (s *Server) Register() error {
128
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
129
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
130 131 132 133
	Params.NodeID = s.session.ServerID
	return nil
}

134
// Init change server state to Initializing
135
func (s *Server) Init() error {
136
	atomic.StoreInt64(&s.isServing, ServerStateInitializing)
S
sunby 已提交
137 138 139
	return nil
}

140 141 142 143 144 145 146
// Start initialize `Server` members and start loops, follow steps are taken:
// 1. initialize message factory parameters
// 2. initialize root coord client, meta, datanode cluster, segment info channel,
//		allocator, segment manager
// 3. start service discovery and server loops, which includes message stream handler (segment statistics,datanode tt)
//		datanodes etcd watch, etcd alive check and flush completed status check
// 4. set server state to Healthy
S
sunby 已提交
147
func (s *Server) Start() error {
148
	var err error
S
sunby 已提交
149 150 151 152 153 154 155 156
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
157
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
158 159
		return err
	}
160

S
sunby 已提交
161 162 163
	if err = s.initMeta(); err != nil {
		return err
	}
164

S
sunby 已提交
165 166 167
	if err = s.initCluster(); err != nil {
		return err
	}
168

S
sunby 已提交
169
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
170

171
	s.startSegmentManager()
S
sunby 已提交
172 173 174
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
175

S
sunby 已提交
176
	s.startServerLoop()
177

178
	atomic.StoreInt64(&s.isServing, ServerStateHealthy)
S
sunby 已提交
179
	log.Debug("DataCoordinator startup success")
S
sunby 已提交
180
	return nil
181 182 183
}

func (s *Server) initCluster() error {
S
sunby 已提交
184 185 186
	var err error
	s.cluster, err = NewCluster(s.ctx, s.kvClient, NewNodesInfo(), s)
	return err
187
}
G
groot 已提交
188

189 190 191
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
192
		log.Debug("DataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
193 194
		return err
	}
195 196
	log.Debug("registered sessions", zap.Any("sessions", sessions))

S
sunby 已提交
197
	datanodes := make([]*NodeInfo, 0, len(sessions))
198
	for _, session := range sessions {
S
sunby 已提交
199
		info := &datapb.DataNodeInfo{
200 201 202
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
203 204 205
		}
		nodeInfo := NewNodeInfo(s.ctx, info)
		datanodes = append(datanodes, nodeInfo)
206
	}
G
godchen 已提交
207

S
sunby 已提交
208
	s.cluster.Startup(datanodes)
209

210
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev+1)
S
sunby 已提交
211 212 213
	return nil
}

214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
	if s.session == nil {
		log.Warn("load data nodes but session is nil")
		return []*datapb.DataNodeInfo{}
	}
	sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
		log.Warn("load data nodes faild", zap.Error(err))
		return []*datapb.DataNodeInfo{}
	}
	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
	return datanodes
}

235
func (s *Server) startSegmentManager() {
236
	s.segmentManager = newSegmentManager(s.meta, s.allocator)
237 238
}

S
sunby 已提交
239
func (s *Server) initMeta() error {
240
	connectEtcdFn := func() error {
241
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
242 243 244
		if err != nil {
			return err
		}
245 246
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
247 248 249 250
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
251
	}
G
godchen 已提交
252
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
253 254
}

255 256
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
257
	s.serverLoopWg.Add(5)
258
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
259
	go s.startDataNodeTtLoop(s.serverLoopCtx)
260 261
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
262
	go s.startFlushLoop(s.serverLoopCtx)
263 264 265
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
266
	defer logutil.LogPanic()
267
	defer s.serverLoopWg.Done()
G
groot 已提交
268
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
269 270
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
	log.Debug("DataCoord stats stream",
271
		zap.String("channelName", Params.StatisticsChannelName),
272
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
273 274 275 276 277
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
278
			log.Debug("stats channel shutdown")
279 280 281
			return
		default:
		}
282
		msgPack := statsStream.Consume()
S
sunby 已提交
283
		if msgPack == nil {
S
sunby 已提交
284
			log.Debug("receive nil stats msg, shutdown stats channel")
S
sunby 已提交
285
			return
S
sunby 已提交
286
		}
287
		for _, msg := range msgPack.Msgs {
288
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
289 290
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
291
				continue
S
sunby 已提交
292
			}
293
			log.Debug("Receive DataNode segment statistics update")
294 295
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
S
sunby 已提交
296
				s.meta.SetCurrentRows(stat.GetSegmentID(), stat.GetNumRows())
297
			}
298 299 300 301
		}
	}
}

S
sunby 已提交
302
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
303
	defer logutil.LogPanic()
304
	defer s.serverLoopWg.Done()
S
sunby 已提交
305 306 307 308 309 310
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
311 312 313
		Params.DataCoordSubscriptionName)
	log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
		Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
S
sunby 已提交
314 315
	ttMsgStream.Start()
	defer ttMsgStream.Close()
316 317 318
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
319
			log.Debug("data node tt loop shutdown")
320 321 322
			return
		default:
		}
S
sunby 已提交
323
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
324
		if msgPack == nil {
S
sunby 已提交
325
			log.Debug("receive nil tt msg, shutdown tt channel")
S
sunby 已提交
326
			return
S
sunby 已提交
327
		}
328
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
329
			if msg.Type() != commonpb.MsgType_DataNodeTt {
S
sunby 已提交
330
				log.Warn("Receive unexpected msg type from tt channel",
S
sunby 已提交
331
					zap.Stringer("msgType", msg.Type()))
332 333
				continue
			}
S
sunby 已提交
334 335 336 337
			ttMsg := msg.(*msgstream.DataNodeTtMsg)

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
338
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
339
			if err != nil {
S
sunby 已提交
340
				log.Warn("get flushable segments failed", zap.Error(err))
341 342
				continue
			}
343

S
sunby 已提交
344 345 346 347
			if len(segments) == 0 {
				continue
			}
			log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
348
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
349
			for _, id := range segments {
S
sunby 已提交
350 351
				sInfo := s.meta.GetSegment(id)
				if sInfo == nil {
S
sunby 已提交
352
					log.Error("get segment from meta error", zap.Int64("id", id),
353
						zap.Error(err))
S
sunby 已提交
354
					continue
355
				}
S
sunby 已提交
356
				segmentInfos = append(segmentInfos, sInfo.SegmentInfo)
357
			}
S
sunby 已提交
358
			if len(segmentInfos) > 0 {
S
sunby 已提交
359
				s.cluster.Flush(segmentInfos)
S
sunby 已提交
360
			}
361
			s.segmentManager.ExpireAllocations(ch, ts)
362 363 364 365 366
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
367
	defer logutil.LogPanic()
368 369 370 371 372 373
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
374
		case event := <-s.eventCh:
S
sunby 已提交
375
			info := &datapb.DataNodeInfo{
376 377 378
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
379
			}
S
sunby 已提交
380
			node := NewNodeInfo(ctx, info)
381 382
			switch event.EventType {
			case sessionutil.SessionAddEvent:
S
sunby 已提交
383
				log.Info("Received datanode register",
S
sunby 已提交
384 385 386
					zap.String("address", info.Address),
					zap.Int64("serverID", info.Version))
				s.cluster.Register(node)
387
			case sessionutil.SessionDelEvent:
S
sunby 已提交
388
				log.Info("Received datanode unregister",
S
sunby 已提交
389 390 391
					zap.String("address", info.Address),
					zap.Int64("serverID", info.Version))
				s.cluster.UnRegister(node)
392 393 394 395 396 397 398
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
399

400
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
401
	defer logutil.LogPanic()
402 403 404 405 406 407 408
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
409
			}
410
			go func() { s.Stop() }()
S
sunby 已提交
411
			log.Debug("disconnect with etcd and shutdown data coordinator")
412 413 414 415
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
416 417 418 419
		}
	}
}

S
sunby 已提交
420 421 422 423 424 425 426 427 428 429 430 431 432
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
S
sunby 已提交
433 434
			segment := s.meta.GetSegment(segmentID)
			if segment == nil {
435 436 437 438 439 440 441
				log.Warn("failed to get flused segment", zap.Int64("id", segmentID))
				continue
			}
			req := &datapb.SegmentFlushCompletedMsg{
				Base: &commonpb.MsgBase{
					MsgType: commonpb.MsgType_SegmentFlushDone,
				},
S
sunby 已提交
442
				Segment: segment.SegmentInfo,
443 444 445 446
			}
			resp, err := s.rootCoordClient.SegmentFlushCompleted(ctx, req)
			if err = VerifyResponse(resp, err); err != nil {
				log.Warn("failed to call SegmentFlushComplete", zap.Int64("segmentID", segmentID), zap.Error(err))
S
sunby 已提交
447 448 449
				continue
			}
			// set segment to SegmentState_Flushed
S
sunby 已提交
450
			if err = s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
S
sunby 已提交
451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

470
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
471
	var err error
G
godchen 已提交
472
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
473 474
		return err
	}
475
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
476 477
		return err
	}
478
	return s.rootCoordClient.Start()
S
sunby 已提交
479
}
480

481 482 483 484
// Stop do the Server finalize processes
// it checks the server status is healthy, if not, just quit
// if Server is healthy, set server state to stopped, release etcd session,
//	stop message stream client and stop server loops
S
sunby 已提交
485
func (s *Server) Stop() error {
486
	if !atomic.CompareAndSwapInt64(&s.isServing, ServerStateHealthy, ServerStateStopped) {
S
sunby 已提交
487 488
		return nil
	}
489
	log.Debug("DataCoord server shutdown")
S
sunby 已提交
490
	s.cluster.Close()
S
sunby 已提交
491
	s.stopServerLoop()
S
sunby 已提交
492 493 494
	return nil
}

S
sunby 已提交
495 496
// CleanMeta only for test
func (s *Server) CleanMeta() error {
497
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
498
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
499 500
}

S
sunby 已提交
501 502 503 504 505
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

506 507 508 509 510 511 512 513 514 515 516 517 518 519
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
520

521 522
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
523
		Base: &commonpb.MsgBase{
524
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
525 526 527 528 529 530 531 532
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
533
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
534 535
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
536 537
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
538
			SourceID:  Params.NodeID,
539
		},
S
sunby 已提交
540 541 542 543 544
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
545 546
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
547 548
		return err
	}
S
sunby 已提交
549 550 551 552 553
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
S
sunby 已提交
554 555
	s.meta.AddCollection(collInfo)
	return nil
S
sunby 已提交
556 557
}

558
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
559
	meta := make(map[string]string)
560

561 562 563 564 565 566 567 568
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
569
	}
570

S
sunby 已提交
571
	return meta, nil
572
}