server.go 16.4 KB
Newer Older
S
sunby 已提交
1
// Copyright (C) 2019-2020 Zilliz. All rights reserved.//// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
S
sunby 已提交
2 3 4 5 6 7 8
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
9

10
package datacoord
S
sunby 已提交
11

S
sunby 已提交
12 13 14
import (
	"context"
	"fmt"
S
sunby 已提交
15
	"math/rand"
S
sunby 已提交
16
	"sync"
S
sunby 已提交
17 18 19
	"sync/atomic"
	"time"

S
sunby 已提交
20
	datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
21
	rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
S
sunby 已提交
22
	"github.com/milvus-io/milvus/internal/logutil"
23 24
	"go.etcd.io/etcd/clientv3"
	"go.uber.org/zap"
S
sunby 已提交
25

X
Xiangyu Wang 已提交
26 27 28 29 30
	etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
	"github.com/milvus-io/milvus/internal/log"
	"github.com/milvus-io/milvus/internal/msgstream"
	"github.com/milvus-io/milvus/internal/types"
	"github.com/milvus-io/milvus/internal/util/retry"
G
godchen 已提交
31
	"github.com/milvus-io/milvus/internal/util/sessionutil"
X
Xiangyu Wang 已提交
32
	"github.com/milvus-io/milvus/internal/util/typeutil"
N
neza2017 已提交
33

X
Xiangyu Wang 已提交
34 35 36 37
	"github.com/milvus-io/milvus/internal/proto/commonpb"
	"github.com/milvus-io/milvus/internal/proto/datapb"
	"github.com/milvus-io/milvus/internal/proto/internalpb"
	"github.com/milvus-io/milvus/internal/proto/milvuspb"
S
sunby 已提交
38 39
)

S
sunby 已提交
40 41 42 43 44
const (
	rootCoordClientTimout = 20 * time.Second
	connEtcdMaxRetryTime  = 100000
	connEtcdRetryInterval = 200 * time.Millisecond
)
S
sunby 已提交
45

S
sunby 已提交
46
type (
S
sunby 已提交
47 48
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
S
sunby 已提交
49
)
S
sunby 已提交
50

G
godchen 已提交
51 52
type dataNodeCreatorFunc func(ctx context.Context, addr string) (types.DataNode, error)
type rootCoordCreatorFunc func(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error)
S
sunby 已提交
53

N
neza2017 已提交
54
type Server struct {
S
sunby 已提交
55 56 57 58
	ctx              context.Context
	serverLoopCtx    context.Context
	serverLoopCancel context.CancelFunc
	serverLoopWg     sync.WaitGroup
S
sunby 已提交
59
	isServing        int64
60 61 62 63

	kvClient          *etcdkv.EtcdKV
	meta              *meta
	segmentInfoStream msgstream.MsgStream
64
	segmentManager    Manager
S
sunby 已提交
65
	allocator         allocator
66
	cluster           *cluster
67
	rootCoordClient   types.RootCoord
S
sunby 已提交
68
	ddChannelName     string
69

S
sunby 已提交
70
	flushCh        chan UniqueID
71 72 73 74 75
	flushMsgStream msgstream.MsgStream
	msFactory      msgstream.Factory

	session  *sessionutil.Session
	activeCh <-chan bool
S
sunby 已提交
76
	eventCh  <-chan *sessionutil.SessionEvent
77

S
sunby 已提交
78 79
	dataClientCreator      dataNodeCreatorFunc
	rootCoordClientCreator rootCoordCreatorFunc
N
neza2017 已提交
80
}
S
sunby 已提交
81

G
groot 已提交
82
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
83
	rand.Seed(time.Now().UnixNano())
S
sunby 已提交
84
	s := &Server{
S
sunby 已提交
85 86 87 88 89
		ctx:                    ctx,
		msFactory:              factory,
		flushCh:                make(chan UniqueID, 1024),
		dataClientCreator:      defaultDataNodeCreatorFunc,
		rootCoordClientCreator: defaultRootCoordCreatorFunc,
S
sunby 已提交
90 91 92 93
	}
	return s, nil
}

G
godchen 已提交
94 95
func defaultDataNodeCreatorFunc(ctx context.Context, addr string) (types.DataNode, error) {
	return datanodeclient.NewClient(ctx, addr)
S
sunby 已提交
96 97
}

G
godchen 已提交
98 99
func defaultRootCoordCreatorFunc(ctx context.Context, metaRootPath string, etcdEndpoints []string) (types.RootCoord, error) {
	return rootcoordclient.NewClient(ctx, metaRootPath, etcdEndpoints)
S
sunby 已提交
100 101
}

102 103
// Register register data service at etcd
func (s *Server) Register() error {
104
	s.session = sessionutil.NewSession(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints)
105
	s.activeCh = s.session.Init(typeutil.DataCoordRole, Params.IP, true)
106 107 108 109 110
	Params.NodeID = s.session.ServerID
	return nil
}

func (s *Server) Init() error {
S
sunby 已提交
111
	atomic.StoreInt64(&s.isServing, 1)
S
sunby 已提交
112 113 114 115
	return nil
}

func (s *Server) Start() error {
116
	var err error
S
sunby 已提交
117 118 119 120 121 122 123 124
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}
125
	if err = s.initRootCoordClient(); err != nil {
S
sunby 已提交
126 127
		return err
	}
128

S
sunby 已提交
129 130 131
	if err = s.initMeta(); err != nil {
		return err
	}
132

S
sunby 已提交
133 134 135
	if err = s.initCluster(); err != nil {
		return err
	}
136

S
sunby 已提交
137 138 139
	if err = s.initSegmentInfoChannel(); err != nil {
		return err
	}
140

S
sunby 已提交
141
	s.allocator = newRootCoordAllocator(s.ctx, s.rootCoordClient)
142

143
	s.startSegmentManager()
S
sunby 已提交
144 145 146
	if err = s.initFlushMsgStream(); err != nil {
		return err
	}
147

S
sunby 已提交
148 149 150
	if err = s.initServiceDiscovery(); err != nil {
		return err
	}
151

S
sunby 已提交
152
	s.startServerLoop()
153

S
sunby 已提交
154
	atomic.StoreInt64(&s.isServing, 2)
S
sunby 已提交
155
	log.Debug("DataCoordinator startup success")
S
sunby 已提交
156
	return nil
157 158 159 160
}

func (s *Server) initCluster() error {
	dManager, err := newClusterNodeManager(s.kvClient)
G
groot 已提交
161 162 163
	if err != nil {
		return err
	}
S
sunby 已提交
164
	sManager := newClusterSessionManager(s.ctx, s.dataClientCreator)
165
	s.cluster = newCluster(s.ctx, dManager, sManager, s)
166 167
	return nil
}
G
groot 已提交
168

169 170 171
func (s *Server) initServiceDiscovery() error {
	sessions, rev, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
172
		log.Debug("DataCoord initMeta failed", zap.Error(err))
G
godchen 已提交
173 174
		return err
	}
175 176 177 178 179 180 181 182 183 184
	log.Debug("registered sessions", zap.Any("sessions", sessions))

	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
G
godchen 已提交
185

186
	if err := s.cluster.startup(datanodes); err != nil {
187
		log.Debug("DataCoord loadMetaFromRootCoord failed", zap.Error(err))
S
sunby 已提交
188 189
		return err
	}
190

S
sunby 已提交
191
	s.eventCh = s.session.WatchServices(typeutil.DataNodeRole, rev)
S
sunby 已提交
192 193 194
	return nil
}

195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
func (s *Server) loadDataNodes() []*datapb.DataNodeInfo {
	if s.session == nil {
		log.Warn("load data nodes but session is nil")
		return []*datapb.DataNodeInfo{}
	}
	sessions, _, err := s.session.GetSessions(typeutil.DataNodeRole)
	if err != nil {
		log.Warn("load data nodes faild", zap.Error(err))
		return []*datapb.DataNodeInfo{}
	}
	datanodes := make([]*datapb.DataNodeInfo, 0, len(sessions))
	for _, session := range sessions {
		datanodes = append(datanodes, &datapb.DataNodeInfo{
			Address:  session.Address,
			Version:  session.ServerID,
			Channels: []*datapb.ChannelStatus{},
		})
	}
	return datanodes
}

216
func (s *Server) startSegmentManager() {
217
	helper := createNewSegmentHelper(s.segmentInfoStream)
218
	s.segmentManager = newSegmentManager(s.meta, s.allocator, withAllocHelper(helper))
219 220
}

221 222 223 224 225 226 227
func (s *Server) initSegmentInfoChannel() error {
	var err error
	s.segmentInfoStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
228
	log.Debug("DataCoord AsProducer: " + Params.SegmentInfoChannelName)
229 230
	s.segmentInfoStream.Start()
	return nil
231 232
}

S
sunby 已提交
233
func (s *Server) initMeta() error {
234
	connectEtcdFn := func() error {
235
		etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
236 237 238
		if err != nil {
			return err
		}
239 240
		s.kvClient = etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
		s.meta, err = newMeta(s.kvClient)
241 242 243 244
		if err != nil {
			return err
		}
		return nil
S
sunby 已提交
245
	}
G
godchen 已提交
246
	return retry.Do(s.ctx, connectEtcdFn, retry.Attempts(connEtcdMaxRetryTime))
S
sunby 已提交
247 248
}

249
func (s *Server) initFlushMsgStream() error {
C
cai.zhang 已提交
250
	var err error
251 252 253 254 255 256
	// segment flush stream
	s.flushMsgStream, err = s.msFactory.NewMsgStream(s.ctx)
	if err != nil {
		return err
	}
	s.flushMsgStream.AsProducer([]string{Params.SegmentInfoChannelName})
257
	log.Debug("DataCoord AsProducer:" + Params.SegmentInfoChannelName)
258
	s.flushMsgStream.Start()
S
sunby 已提交
259 260
	return nil
}
S
sunby 已提交
261

262 263
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
264
	s.serverLoopWg.Add(5)
265
	go s.startStatsChannel(s.serverLoopCtx)
S
sunby 已提交
266
	go s.startDataNodeTtLoop(s.serverLoopCtx)
267 268
	go s.startWatchService(s.serverLoopCtx)
	go s.startActiveCheck(s.serverLoopCtx)
S
sunby 已提交
269
	go s.startFlushLoop(s.serverLoopCtx)
270 271 272
}

func (s *Server) startStatsChannel(ctx context.Context) {
S
sunby 已提交
273
	defer logutil.LogPanic()
274
	defer s.serverLoopWg.Done()
G
groot 已提交
275
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
276 277
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataCoordSubscriptionName)
	log.Debug("DataCoord stats stream",
278
		zap.String("channelName", Params.StatisticsChannelName),
279
		zap.String("descriptionName", Params.DataCoordSubscriptionName))
280 281 282 283 284
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
285
			log.Debug("stats channel shutdown")
286 287 288
			return
		default:
		}
289
		msgPack := statsStream.Consume()
S
sunby 已提交
290
		if msgPack == nil {
S
sunby 已提交
291
			return
S
sunby 已提交
292
		}
293
		for _, msg := range msgPack.Msgs {
294
			if msg.Type() != commonpb.MsgType_SegmentStatistics {
S
sunby 已提交
295 296
				log.Warn("receive unknown msg from segment statistics channel",
					zap.Stringer("msgType", msg.Type()))
297
				continue
S
sunby 已提交
298
			}
299
			log.Debug("Receive DataNode segment statistics update")
300 301
			ssMsg := msg.(*msgstream.SegmentStatisticsMsg)
			for _, stat := range ssMsg.SegStats {
302
				s.segmentManager.UpdateSegmentStats(stat)
303
			}
304 305 306 307
		}
	}
}

S
sunby 已提交
308
func (s *Server) startDataNodeTtLoop(ctx context.Context) {
S
sunby 已提交
309
	defer logutil.LogPanic()
310
	defer s.serverLoopWg.Done()
S
sunby 已提交
311 312 313 314 315 316
	ttMsgStream, err := s.msFactory.NewMsgStream(ctx)
	if err != nil {
		log.Error("new msg stream failed", zap.Error(err))
		return
	}
	ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName},
317 318 319
		Params.DataCoordSubscriptionName)
	log.Debug(fmt.Sprintf("DataCoord AsConsumer:%s:%s",
		Params.TimeTickChannelName, Params.DataCoordSubscriptionName))
S
sunby 已提交
320 321
	ttMsgStream.Start()
	defer ttMsgStream.Close()
322 323 324
	for {
		select {
		case <-ctx.Done():
S
sunby 已提交
325
			log.Debug("data node tt loop shutdown")
326 327 328
			return
		default:
		}
S
sunby 已提交
329
		msgPack := ttMsgStream.Consume()
S
sunby 已提交
330
		if msgPack == nil {
S
sunby 已提交
331
			return
S
sunby 已提交
332
		}
333
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
334
			if msg.Type() != commonpb.MsgType_DataNodeTt {
S
sunby 已提交
335
				log.Warn("Receive unexpected msg type from tt channel",
S
sunby 已提交
336
					zap.Stringer("msgType", msg.Type()))
337 338
				continue
			}
S
sunby 已提交
339 340 341 342
			ttMsg := msg.(*msgstream.DataNodeTtMsg)

			ch := ttMsg.ChannelName
			ts := ttMsg.Timestamp
343 344
			// log.Debug("Receive datanode timetick msg", zap.String("channel", ch),
			// zap.Any("ts", ts))
345
			segments, err := s.segmentManager.GetFlushableSegments(ctx, ch, ts)
346
			if err != nil {
S
sunby 已提交
347
				log.Warn("get flushable segments failed", zap.Error(err))
348 349
				continue
			}
350

S
sunby 已提交
351 352 353 354
			if len(segments) == 0 {
				continue
			}
			log.Debug("Flush segments", zap.Int64s("segmentIDs", segments))
355
			segmentInfos := make([]*datapb.SegmentInfo, 0, len(segments))
S
sunby 已提交
356 357
			for _, id := range segments {
				sInfo, err := s.meta.GetSegment(id)
358
				if err != nil {
S
sunby 已提交
359
					log.Error("get segment from meta error", zap.Int64("id", id),
360
						zap.Error(err))
S
sunby 已提交
361
					continue
362
				}
363 364
				segmentInfos = append(segmentInfos, sInfo)
			}
S
sunby 已提交
365 366 367
			if len(segmentInfos) > 0 {
				s.cluster.flush(segmentInfos)
			}
368
			s.segmentManager.ExpireAllocations(ch, ts)
369 370 371 372 373
		}
	}
}

func (s *Server) startWatchService(ctx context.Context) {
S
sunby 已提交
374
	defer logutil.LogPanic()
375 376 377 378 379 380
	defer s.serverLoopWg.Done()
	for {
		select {
		case <-ctx.Done():
			log.Debug("watch service shutdown")
			return
S
sunby 已提交
381
		case event := <-s.eventCh:
382 383 384 385
			datanode := &datapb.DataNodeInfo{
				Address:  event.Session.Address,
				Version:  event.Session.ServerID,
				Channels: []*datapb.ChannelStatus{},
S
sunby 已提交
386
			}
387 388
			switch event.EventType {
			case sessionutil.SessionAddEvent:
S
sunby 已提交
389 390 391
				log.Info("Received datanode register",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
392 393
				//s.cluster.register(datanode)
				s.cluster.refresh(s.loadDataNodes())
394
			case sessionutil.SessionDelEvent:
S
sunby 已提交
395 396 397
				log.Info("Received datanode unregister",
					zap.String("address", datanode.Address),
					zap.Int64("serverID", datanode.Version))
398 399
				//s.cluster.unregister(datanode)
				s.cluster.refresh(s.loadDataNodes())
400 401 402 403 404 405 406
			default:
				log.Warn("receive unknown service event type",
					zap.Any("type", event.EventType))
			}
		}
	}
}
S
sunby 已提交
407

408
func (s *Server) startActiveCheck(ctx context.Context) {
S
sunby 已提交
409
	defer logutil.LogPanic()
410 411 412 413 414 415 416
	defer s.serverLoopWg.Done()

	for {
		select {
		case _, ok := <-s.activeCh:
			if ok {
				continue
417
			}
418
			s.Stop()
S
sunby 已提交
419
			log.Debug("disconnect with etcd and shutdown data coordinator")
420 421 422 423
			return
		case <-ctx.Done():
			log.Debug("connection check shutdown")
			return
424 425 426 427
		}
	}
}

S
sunby 已提交
428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473
func (s *Server) startFlushLoop(ctx context.Context) {
	defer logutil.LogPanic()
	defer s.serverLoopWg.Done()
	ctx2, cancel := context.WithCancel(ctx)
	defer cancel()
	// send `Flushing` segments
	go s.handleFlushingSegments(ctx2)
	var err error
	for {
		select {
		case <-ctx.Done():
			log.Debug("flush loop shutdown")
			return
		case segmentID := <-s.flushCh:
			// write flush msg into segmentInfo/flush stream
			msgPack := composeSegmentFlushMsgPack(segmentID)
			err = s.flushMsgStream.Produce(&msgPack)
			if err != nil {
				log.Error("produce flush msg failed",
					zap.Int64("segmentID", segmentID),
					zap.Error(err))
				continue
			}
			log.Debug("send segment flush msg", zap.Int64("id", segmentID))

			// set segment to SegmentState_Flushed
			if err = s.meta.FlushSegment(segmentID); err != nil {
				log.Error("flush segment complete failed", zap.Error(err))
				continue
			}
			log.Debug("flush segment complete", zap.Int64("id", segmentID))
		}
	}
}

func (s *Server) handleFlushingSegments(ctx context.Context) {
	segments := s.meta.GetFlushingSegments()
	for _, segment := range segments {
		select {
		case <-ctx.Done():
			return
		case s.flushCh <- segment.ID:
		}
	}
}

474
func (s *Server) initRootCoordClient() error {
S
sunby 已提交
475
	var err error
G
godchen 已提交
476
	if s.rootCoordClient, err = s.rootCoordClientCreator(s.ctx, Params.MetaRootPath, Params.EtcdEndpoints); err != nil {
S
sunby 已提交
477 478
		return err
	}
479
	if err = s.rootCoordClient.Init(); err != nil {
S
sunby 已提交
480 481
		return err
	}
482
	return s.rootCoordClient.Start()
S
sunby 已提交
483
}
484

S
sunby 已提交
485
func (s *Server) Stop() error {
S
sunby 已提交
486
	if !atomic.CompareAndSwapInt64(&s.isServing, 2, 0) {
S
sunby 已提交
487 488
		return nil
	}
489
	log.Debug("DataCoord server shutdown")
S
sunby 已提交
490 491 492 493 494
	atomic.StoreInt64(&s.isServing, 0)
	s.cluster.releaseSessions()
	s.segmentInfoStream.Close()
	s.flushMsgStream.Close()
	s.stopServerLoop()
S
sunby 已提交
495 496 497
	return nil
}

S
sunby 已提交
498 499
// CleanMeta only for test
func (s *Server) CleanMeta() error {
500
	log.Debug("clean meta", zap.Any("kv", s.kvClient))
501
	return s.kvClient.RemoveWithPrefix("")
S
sunby 已提交
502 503
}

S
sunby 已提交
504 505 506 507 508
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

509 510 511 512 513 514 515 516 517 518 519 520 521 522
//func (s *Server) validateAllocRequest(collID UniqueID, partID UniqueID, channelName string) error {
//	if !s.meta.HasCollection(collID) {
//		return fmt.Errorf("can not find collection %d", collID)
//	}
//	if !s.meta.HasPartition(collID, partID) {
//		return fmt.Errorf("can not find partition %d", partID)
//	}
//	for _, name := range s.insertChannels {
//		if name == channelName {
//			return nil
//		}
//	}
//	return fmt.Errorf("can not find channel %s", channelName)
//}
S
sunby 已提交
523

524 525
func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID int64) error {
	resp, err := s.rootCoordClient.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
S
sunby 已提交
526
		Base: &commonpb.MsgBase{
527
			MsgType:  commonpb.MsgType_DescribeCollection,
S
sunby 已提交
528 529 530 531 532 533 534 535
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
536
	presp, err := s.rootCoordClient.ShowPartitions(ctx, &milvuspb.ShowPartitionsRequest{
S
sunby 已提交
537 538
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_ShowPartitions,
S
sunby 已提交
539 540
			MsgID:     0,
			Timestamp: 0,
S
sunby 已提交
541
			SourceID:  Params.NodeID,
542
		},
S
sunby 已提交
543 544 545 546 547
		DbName:         "",
		CollectionName: resp.Schema.Name,
		CollectionID:   resp.CollectionID,
	})
	if err = VerifyResponse(presp, err); err != nil {
548 549
		log.Error("show partitions error", zap.String("collectionName", resp.Schema.Name),
			zap.Int64("collectionID", resp.CollectionID), zap.Error(err))
550 551
		return err
	}
S
sunby 已提交
552 553 554 555 556 557
	collInfo := &datapb.CollectionInfo{
		ID:         resp.CollectionID,
		Schema:     resp.Schema,
		Partitions: presp.PartitionIDs,
	}
	return s.meta.AddCollection(collInfo)
S
sunby 已提交
558 559
}

560
func (s *Server) prepareBinlog(req *datapb.SaveBinlogPathsRequest) (map[string]string, error) {
S
sunby 已提交
561
	meta := make(map[string]string)
562

563 564 565 566 567 568 569 570
	for _, fieldBlp := range req.Field2BinlogPaths {
		fieldMeta, err := s.prepareField2PathMeta(req.SegmentID, fieldBlp)
		if err != nil {
			return nil, err
		}
		for k, v := range fieldMeta {
			meta[k] = v
		}
571
	}
572

S
sunby 已提交
573
	return meta, nil
574
}
575 576 577 578 579 580 581 582

func composeSegmentFlushMsgPack(segmentID UniqueID) msgstream.MsgPack {
	msgPack := msgstream.MsgPack{
		Msgs: make([]msgstream.TsMsg, 0, 1),
	}
	completeFlushMsg := internalpb.SegmentFlushCompletedMsg{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_SegmentFlushDone,
S
sunby 已提交
583 584
			MsgID:     0,
			Timestamp: 0,
585 586 587 588 589 590 591 592 593 594 595 596 597 598
			SourceID:  Params.NodeID,
		},
		SegmentID: segmentID,
	}
	var msg msgstream.TsMsg = &msgstream.FlushCompletedMsg{
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
		SegmentFlushCompletedMsg: completeFlushMsg,
	}

	msgPack.Msgs = append(msgPack.Msgs, msg)
	return msgPack
}