server.go 24.4 KB
Newer Older
S
sunby 已提交
1 2
package dataservice

S
sunby 已提交
3 4 5
import (
	"context"
	"fmt"
S
sunby 已提交
6 7
	"path"
	"strconv"
S
sunby 已提交
8
	"sync"
S
sunby 已提交
9 10 11
	"sync/atomic"
	"time"

12 13
	grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client"

14 15 16 17
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"

S
sunby 已提交
18 19
	"github.com/golang/protobuf/proto"
	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
S
sunby 已提交
20 21 22

	"github.com/zilliztech/milvus-distributed/internal/msgstream"

S
sunby 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"

	"github.com/zilliztech/milvus-distributed/internal/timesync"

	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
	"go.etcd.io/etcd/clientv3"

	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)

S
sunby 已提交
36 37
const role = "dataservice"

S
sunby 已提交
38 39
type DataService interface {
	typeutil.Service
N
neza2017 已提交
40
	typeutil.Component
S
sunby 已提交
41 42 43 44 45 46 47
	RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
	Flush(req *datapb.FlushRequest) (*commonpb.Status, error)

	AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
	ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
	GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
	GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
G
godchen 已提交
48 49
	GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
	GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
S
sunby 已提交
50 51 52
	GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
	GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
	GetComponentStates() (*internalpb2.ComponentStates, error)
53
	GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
X
XuanYang-cn 已提交
54
	GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
S
sunby 已提交
55 56
}

S
sunby 已提交
57 58 59 60 61 62 63 64 65 66
type MasterClient interface {
	ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
	DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
	ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
	GetDdChannel() (string, error)
	AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error)
	AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error)
	GetComponentStates() (*internalpb2.ComponentStates, error)
}

S
sunby 已提交
67 68 69 70 71 72 73
type DataNodeClient interface {
	WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
	GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error)
	FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
	Stop() error
}

S
sunby 已提交
74
type (
S
sunby 已提交
75 76 77
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
	Server    struct {
N
neza2017 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
		ctx               context.Context
		serverLoopCtx     context.Context
		serverLoopCancel  context.CancelFunc
		serverLoopWg      sync.WaitGroup
		state             atomic.Value
		client            *etcdkv.EtcdKV
		meta              *meta
		segAllocator      segmentAllocator
		statsHandler      *statsHandler
		ddHandler         *ddHandler
		allocator         allocator
		cluster           *dataNodeCluster
		msgProducer       *timesync.MsgProducer
		registerFinishCh  chan struct{}
		masterClient      MasterClient
		ttMsgStream       msgstream.MsgStream
		k2sMsgStream      msgstream.MsgStream
		ddChannelName     string
		segmentInfoStream msgstream.MsgStream
97
		insertChannels    []string
G
groot 已提交
98
		msFactory         msgstream.Factory
C
cai.zhang 已提交
99
		ttBarrier         timesync.TimeTickBarrier
S
sunby 已提交
100 101 102
	}
)

G
groot 已提交
103
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
104
	ch := make(chan struct{})
S
sunby 已提交
105
	s := &Server{
S
sunby 已提交
106
		ctx:              ctx,
S
sunby 已提交
107 108
		registerFinishCh: ch,
		cluster:          newDataNodeCluster(ch),
G
groot 已提交
109
		msFactory:        factory,
S
sunby 已提交
110
	}
111
	s.insertChannels = s.getInsertChannels()
112
	s.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
S
sunby 已提交
113 114 115
	return s, nil
}

116 117 118 119 120 121 122 123 124
func (s *Server) getInsertChannels() []string {
	channels := make([]string, Params.InsertChannelNum)
	var i int64 = 0
	for ; i < Params.InsertChannelNum; i++ {
		channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
	}
	return channels
}

S
sunby 已提交
125 126
func (s *Server) SetMasterClient(masterClient MasterClient) {
	s.masterClient = masterClient
S
sunby 已提交
127 128 129
}

func (s *Server) Init() error {
S
sunby 已提交
130 131 132 133
	return nil
}

func (s *Server) Start() error {
134
	var err error
G
groot 已提交
135 136 137 138 139 140 141 142 143
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}

S
sunby 已提交
144
	s.allocator = newAllocatorImpl(s.masterClient)
145
	if err = s.initMeta(); err != nil {
S
sunby 已提交
146 147 148
		return err
	}
	s.statsHandler = newStatsHandler(s.meta)
Y
yukun 已提交
149
	s.segAllocator = newSegmentAllocator(s.meta, s.allocator)
N
neza2017 已提交
150
	s.ddHandler = newDDHandler(s.meta, s.segAllocator)
151 152
	s.initSegmentInfoChannel()
	if err = s.loadMetaFromMaster(); err != nil {
S
sunby 已提交
153 154
		return err
	}
155
	s.waitDataNodeRegister()
156
	s.cluster.WatchInsertChannels(s.insertChannels)
157 158 159
	if err = s.initMsgProducer(); err != nil {
		return err
	}
S
sunby 已提交
160
	s.startServerLoop()
161
	s.UpdateStateCode(internalpb2.StateCode_HEALTHY)
162
	log.Debug("start success")
S
sunby 已提交
163 164 165
	return nil
}

166 167 168 169
func (s *Server) UpdateStateCode(code internalpb2.StateCode) {
	s.state.Store(code)
}

S
sunby 已提交
170 171 172 173
func (s *Server) checkStateIsHealthy() bool {
	return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY
}

S
sunby 已提交
174 175 176 177 178 179 180
func (s *Server) initMeta() error {
	etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
	if err != nil {
		return err
	}
	etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
	s.client = etcdKV
N
neza2017 已提交
181
	s.meta, err = newMeta(etcdKV)
S
sunby 已提交
182 183 184 185 186 187
	if err != nil {
		return err
	}
	return nil
}

188
func (s *Server) initSegmentInfoChannel() {
G
groot 已提交
189
	segmentInfoStream, _ := s.msFactory.NewMsgStream(s.ctx)
Z
zhenshan.cao 已提交
190
	segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
191 192
	s.segmentInfoStream = segmentInfoStream
	s.segmentInfoStream.Start()
S
sunby 已提交
193
}
S
sunby 已提交
194
func (s *Server) initMsgProducer() error {
C
cai.zhang 已提交
195
	var err error
196
	if s.ttMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil {
C
cai.zhang 已提交
197 198 199
		return err
	}
	s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
S
sunby 已提交
200
	s.ttMsgStream.Start()
S
sunby 已提交
201 202
	s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
	s.ttBarrier.Start()
G
groot 已提交
203
	if s.k2sMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil {
C
cai.zhang 已提交
204 205 206
		return err
	}
	s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
207
	s.k2sMsgStream.Start()
C
cai.zhang 已提交
208
	dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
209
	k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
C
cai.zhang 已提交
210
	if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
S
sunby 已提交
211 212 213 214 215
		return err
	}
	s.msgProducer.Start(s.ctx)
	return nil
}
S
sunby 已提交
216

S
sunby 已提交
217
func (s *Server) loadMetaFromMaster() error {
218
	log.Debug("loading collection meta from master")
S
sunby 已提交
219 220
	var err error
	if err = s.checkMasterIsHealthy(); err != nil {
S
sunby 已提交
221 222
		return err
	}
N
neza2017 已提交
223 224 225 226 227 228 229
	if s.ddChannelName == "" {
		channel, err := s.masterClient.GetDdChannel()
		if err != nil {
			return err
		}
		s.ddChannelName = channel
	}
S
sunby 已提交
230 231 232 233 234
	collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kShowCollections,
			MsgID:     -1, // todo add msg id
			Timestamp: 0,  // todo
S
sunby 已提交
235
			SourceID:  Params.NodeID,
S
sunby 已提交
236 237 238
		},
		DbName: "",
	})
S
sunby 已提交
239
	if err = VerifyResponse(collections, err); err != nil {
S
sunby 已提交
240 241 242 243 244 245 246 247
		return err
	}
	for _, collectionName := range collections.CollectionNames {
		collection, err := s.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kDescribeCollection,
				MsgID:     -1, // todo
				Timestamp: 0,  // todo
S
sunby 已提交
248
				SourceID:  Params.NodeID,
S
sunby 已提交
249 250 251 252
			},
			DbName:         "",
			CollectionName: collectionName,
		})
S
sunby 已提交
253 254
		if err = VerifyResponse(collection, err); err != nil {
			log.Error("describe collection error", zap.String("collectionName", collectionName), zap.Error(err))
S
sunby 已提交
255 256 257 258 259 260 261
			continue
		}
		partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kShowPartitions,
				MsgID:     -1, // todo
				Timestamp: 0,  // todo
S
sunby 已提交
262
				SourceID:  Params.NodeID,
S
sunby 已提交
263 264 265 266 267
			},
			DbName:         "",
			CollectionName: collectionName,
			CollectionID:   collection.CollectionID,
		})
S
sunby 已提交
268 269
		if err = VerifyResponse(partitions, err); err != nil {
			log.Error("show partitions error", zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
S
sunby 已提交
270 271 272 273 274
			continue
		}
		err = s.meta.AddCollection(&collectionInfo{
			ID:         collection.CollectionID,
			Schema:     collection.Schema,
S
sunby 已提交
275
			Partitions: partitions.PartitionIDs,
S
sunby 已提交
276 277
		})
		if err != nil {
S
sunby 已提交
278
			log.Error("add collection to meta error", zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
S
sunby 已提交
279 280 281
			continue
		}
	}
282
	log.Debug("load collection meta from master complete")
S
sunby 已提交
283
	return nil
S
sunby 已提交
284
}
S
sunby 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

func (s *Server) checkMasterIsHealthy() error {
	ticker := time.NewTicker(300 * time.Millisecond)
	ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
	defer func() {
		ticker.Stop()
		cancel()
	}()
	for {
		var resp *internalpb2.ComponentStates
		var err error
		select {
		case <-ctx.Done():
			return fmt.Errorf("master is not healthy")
		case <-ticker.C:
			resp, err = s.masterClient.GetComponentStates()
S
sunby 已提交
301
			if err = VerifyResponse(resp, err); err != nil {
S
sunby 已提交
302 303 304 305 306 307 308 309 310 311
				return err
			}
		}
		if resp.State.StateCode == internalpb2.StateCode_HEALTHY {
			break
		}
	}
	return nil
}

312 313
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
314
	s.serverLoopWg.Add(2)
315 316 317 318 319 320
	go s.startStatsChannel(s.serverLoopCtx)
	go s.startSegmentFlushChannel(s.serverLoopCtx)
}

func (s *Server) startStatsChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
321
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
322
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
323 324 325 326 327 328 329 330
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
			return
		default:
		}
331
		msgPack, _ := statsStream.Consume()
332
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
333 334 335 336
			statistics, ok := msg.(*msgstream.SegmentStatisticsMsg)
			if !ok {
				log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type()))
			}
337 338
			for _, stat := range statistics.SegStats {
				if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
S
sunby 已提交
339
					log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err))
340 341 342 343 344 345 346 347 348
					continue
				}
			}
		}
	}
}

func (s *Server) startSegmentFlushChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
349
	flushStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
350
	flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
351 352 353 354 355
	flushStream.Start()
	defer flushStream.Close()
	for {
		select {
		case <-ctx.Done():
356
			log.Debug("segment flush channel shut down")
357 358 359
			return
		default:
		}
360
		msgPack, _ := flushStream.Consume()
361 362 363 364 365 366 367 368
		for _, msg := range msgPack.Msgs {
			if msg.Type() != commonpb.MsgType_kSegmentFlushDone {
				continue
			}
			realMsg := msg.(*msgstream.FlushCompletedMsg)

			segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID)
			if err != nil {
S
sunby 已提交
369
				log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err))
370 371 372
				continue
			}
			segmentInfo.FlushedTime = realMsg.BeginTimestamp
S
sunby 已提交
373
			segmentInfo.State = commonpb.SegmentState_SegmentFlushed
374
			if err = s.meta.UpdateSegment(segmentInfo); err != nil {
375
				log.Error("update segment error", zap.Error(err))
376 377 378 379 380 381
				continue
			}
		}
	}
}

N
neza2017 已提交
382 383
func (s *Server) startDDChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
384
	ddStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
385
	ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
N
neza2017 已提交
386 387 388 389 390
	ddStream.Start()
	defer ddStream.Close()
	for {
		select {
		case <-ctx.Done():
391
			log.Debug("dd channel shut down")
N
neza2017 已提交
392 393 394
			return
		default:
		}
395
		msgPack, _ := ddStream.Consume()
N
neza2017 已提交
396 397
		for _, msg := range msgPack.Msgs {
			if err := s.ddHandler.HandleDDMsg(msg); err != nil {
398
				log.Error("handle dd msg error", zap.Error(err))
N
neza2017 已提交
399 400 401 402 403 404
				continue
			}
		}
	}
}

405
func (s *Server) waitDataNodeRegister() {
406
	log.Debug("waiting data node to register")
407
	<-s.registerFinishCh
408
	log.Debug("all data nodes register")
409
}
S
sunby 已提交
410 411

func (s *Server) Stop() error {
S
sunby 已提交
412
	s.cluster.ShutDownClients()
S
sunby 已提交
413
	s.ttBarrier.Close()
S
sunby 已提交
414
	s.ttMsgStream.Close()
415
	s.k2sMsgStream.Close()
S
sunby 已提交
416
	s.msgProducer.Close()
N
neza2017 已提交
417
	s.segmentInfoStream.Close()
S
sunby 已提交
418
	s.stopServerLoop()
S
sunby 已提交
419 420 421
	return nil
}

S
sunby 已提交
422 423 424 425 426
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

S
sunby 已提交
427
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
S
sunby 已提交
428 429 430 431
	resp := &internalpb2.ComponentStates{
		State: &internalpb2.ComponentInfo{
			NodeID:    Params.NodeID,
			Role:      role,
S
sunby 已提交
432
			StateCode: s.state.Load().(internalpb2.StateCode),
S
sunby 已提交
433 434 435 436 437 438 439 440 441 442 443 444 445
		},
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	dataNodeStates, err := s.cluster.GetDataNodeStates()
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.SubcomponentStates = dataNodeStates
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	return resp, nil
S
sunby 已提交
446 447
}

G
godchen 已提交
448 449 450 451 452 453 454
func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.TimeTickChannelName,
	}, nil
S
sunby 已提交
455 456
}

G
godchen 已提交
457 458 459 460 461 462 463
func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.StatisticsChannelName,
	}, nil
S
sunby 已提交
464 465 466
}

func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
S
sunby 已提交
467
	ret := &datapb.RegisterNodeResponse{
S
sunby 已提交
468
		Status: &commonpb.Status{
S
sunby 已提交
469
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
S
sunby 已提交
470
		},
S
sunby 已提交
471
	}
472
	log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port))
S
sunby 已提交
473 474 475 476
	node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID)
	if err != nil {
		return nil, err
	}
477

S
sunby 已提交
478
	s.cluster.Register(node)
479

N
neza2017 已提交
480
	if s.ddChannelName == "" {
N
neza2017 已提交
481
		resp, err := s.masterClient.GetDdChannel()
S
sunby 已提交
482
		if err = VerifyResponse(resp, err); err != nil {
S
sunby 已提交
483 484 485
			ret.Status.Reason = err.Error()
			return ret, err
		}
N
neza2017 已提交
486
		s.ddChannelName = resp
S
sunby 已提交
487 488 489 490 491 492 493 494
	}
	ret.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	ret.InitParams = &internalpb2.InitParams{
		NodeID: Params.NodeID,
		StartParams: []*commonpb.KeyValuePair{
			{Key: "DDChannelName", Value: s.ddChannelName},
			{Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName},
			{Key: "TimeTickChannelName", Value: Params.TimeTickChannelName},
N
neza2017 已提交
495
			{Key: "CompleteFlushChannelName", Value: Params.SegmentInfoChannelName},
S
sunby 已提交
496 497 498
		},
	}
	return ret, nil
S
sunby 已提交
499 500
}

S
sunby 已提交
501
func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) {
502
	client := grpcdatanodeclient.NewClient(fmt.Sprintf("%s:%d", ip, port))
S
sunby 已提交
503 504 505
	if err := client.Init(); err != nil {
		return nil, err
	}
506

S
sunby 已提交
507 508 509 510 511 512 513 514 515 516 517 518 519 520
	if err := client.Start(); err != nil {
		return nil, err
	}
	return &dataNode{
		id: id,
		address: struct {
			ip   string
			port int64
		}{ip: ip, port: port},
		client:     client,
		channelNum: 0,
	}, nil
}

S
sunby 已提交
521
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
S
sunby 已提交
522 523 524 525 526 527
	if !s.checkStateIsHealthy() {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    "server is initializing",
		}, nil
	}
N
neza2017 已提交
528
	s.segAllocator.SealAllSegments(req.CollectionID)
S
sunby 已提交
529 530 531
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, nil
S
sunby 已提交
532 533 534 535 536 537 538 539 540
}

func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
	resp := &datapb.AssignSegIDResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		SegIDAssignments: make([]*datapb.SegIDAssignment, 0),
	}
S
sunby 已提交
541 542 543 544 545
	if !s.checkStateIsHealthy() {
		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
		resp.Status.Reason = "server is initializing"
		return resp, nil
	}
S
sunby 已提交
546
	for _, r := range req.SegIDRequests {
S
sunby 已提交
547 548 549 550 551 552
		if !s.meta.HasCollection(r.CollectionID) {
			if err := s.loadCollectionFromMaster(r.CollectionID); err != nil {
				log.Error("load collection from master error", zap.Int64("collectionID", r.CollectionID), zap.Error(err))
				continue
			}
		}
S
sunby 已提交
553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594
		result := &datapb.SegIDAssignment{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			},
		}
		segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
		if err != nil {
			if _, ok := err.(errRemainInSufficient); !ok {
				result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}

			if err = s.openNewSegment(r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
				result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}

			segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
			if err != nil {
				result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}
		}

		result.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
		result.CollectionID = r.CollectionID
		result.SegID = segmentID
		result.PartitionID = r.PartitionID
		result.Count = uint32(retCount)
		result.ExpireTime = expireTs
		result.ChannelName = r.ChannelName
		resp.SegIDAssignments = append(resp.SegIDAssignments, result)
	}
	return resp, nil
}

S
sunby 已提交
595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613
func (s *Server) loadCollectionFromMaster(collectionID int64) error {
	resp, err := s.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
		Base: &commonpb.MsgBase{
			MsgType:  commonpb.MsgType_kDescribeCollection,
			SourceID: Params.NodeID,
		},
		DbName:       "",
		CollectionID: collectionID,
	})
	if err = VerifyResponse(resp, err); err != nil {
		return err
	}
	collInfo := &collectionInfo{
		ID:     resp.CollectionID,
		Schema: resp.Schema,
	}
	return s.meta.AddCollection(collInfo)
}

S
sunby 已提交
614
func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
N
neza2017 已提交
615 616 617 618
	id, err := s.allocator.allocID()
	if err != nil {
		return err
	}
S
sunby 已提交
619
	segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName)
S
sunby 已提交
620 621 622 623 624 625
	if err != nil {
		return err
	}
	if err = s.meta.AddSegment(segmentInfo); err != nil {
		return err
	}
S
sunby 已提交
626
	if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
S
sunby 已提交
627 628
		return err
	}
629
	infoMsg := &msgstream.SegmentInfoMsg{
S
sunby 已提交
630 631 632
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
633 634 635 636
		SegmentMsg: datapb.SegmentMsg{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSegmentInfo,
				MsgID:     0,
N
neza2017 已提交
637 638
				Timestamp: 0,
				SourceID:  Params.NodeID,
639 640 641 642
			},
			Segment: segmentInfo,
		},
	}
G
groot 已提交
643
	msgPack := &msgstream.MsgPack{
644 645
		Msgs: []msgstream.TsMsg{infoMsg},
	}
X
XuanYang-cn 已提交
646
	if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil {
647 648
		return err
	}
S
sunby 已提交
649 650 651 652
	return nil
}

func (s *Server) ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
S
sunby 已提交
653 654 655 656 657
	resp := &datapb.ShowSegmentResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
S
sunby 已提交
658
	if !s.checkStateIsHealthy() {
S
sunby 已提交
659 660
		resp.Status.Reason = "server is initializing"
		return resp, nil
S
sunby 已提交
661
	}
662
	ids := s.meta.GetSegmentsOfPartition(req.CollectionID, req.PartitionID)
S
sunby 已提交
663 664 665
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	resp.SegmentIDs = ids
	return resp, nil
S
sunby 已提交
666 667 668 669 670 671 672 673
}

func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
	resp := &datapb.SegmentStatesResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
S
sunby 已提交
674 675 676 677
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "server is initializing"
		return resp, nil
	}
S
sunby 已提交
678

Z
zhenshan.cao 已提交
679 680 681 682 683 684 685 686 687 688 689 690 691 692 693
	for _, segmentID := range req.SegmentIDs {
		state := &datapb.SegmentStateInfo{
			Status:    &commonpb.Status{},
			SegmentID: segmentID,
		}
		segmentInfo, err := s.meta.GetSegment(segmentID)
		if err != nil {
			state.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
			state.Status.Reason = "get segment states error: " + err.Error()
		} else {
			state.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
			state.State = segmentInfo.State
			state.CreateTime = segmentInfo.OpenTime
			state.SealedTime = segmentInfo.SealedTime
			state.FlushedTime = segmentInfo.FlushedTime
X
XuanYang-cn 已提交
694 695
			state.StartPosition = segmentInfo.StartPosition
			state.EndPosition = segmentInfo.EndPosition
Z
zhenshan.cao 已提交
696 697
		}
		resp.States = append(resp.States, state)
S
sunby 已提交
698
	}
S
sunby 已提交
699
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
Z
zhenshan.cao 已提交
700

S
sunby 已提交
701 702 703 704
	return resp, nil
}

func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
S
sunby 已提交
705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723
	resp := &datapb.InsertBinlogPathsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10))
	value, err := s.client.Load(p)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	flushMeta := &datapb.SegmentFlushMeta{}
	err = proto.UnmarshalText(value, flushMeta)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	fields := make([]UniqueID, len(flushMeta.Fields))
	paths := make([]*internalpb2.StringList, len(flushMeta.Fields))
X
Xiangyu Wang 已提交
724 725 726
	for i, field := range flushMeta.Fields {
		fields[i] = field.FieldID
		paths[i] = &internalpb2.StringList{Values: field.BinlogPaths}
S
sunby 已提交
727
	}
S
sunby 已提交
728
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
S
sunby 已提交
729 730 731
	resp.FieldIDs = fields
	resp.Paths = paths
	return resp, nil
S
sunby 已提交
732 733
}

G
godchen 已提交
734 735 736 737 738 739 740
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
	return &internalpb2.StringList{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Values: s.insertChannels,
	}, nil
S
sunby 已提交
741 742 743
}

func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
744 745 746 747 748 749 750 751 752 753 754
	resp := &datapb.CollectionStatsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
755
	resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
756
	return resp, nil
S
sunby 已提交
757 758 759 760 761
}

func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) {
	// todo implement
	return nil, nil
S
sunby 已提交
762
}
N
neza2017 已提交
763

G
godchen 已提交
764 765 766 767 768 769 770
func (s *Server) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.SegmentInfoChannelName,
	}, nil
N
neza2017 已提交
771
}
772 773 774 775 776 777 778

func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
	resp := &datapb.CollectionCountResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
X
XuanYang-cn 已提交
779 780 781 782
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "data service is not healthy"
		return resp, nil
	}
783 784 785 786 787 788 789 790 791
	nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.Count = nums
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	return resp, nil
}
X
XuanYang-cn 已提交
792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815

func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
	resp := &datapb.SegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "data service is not healthy"
		return resp, nil
	}
	infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
	for i, id := range req.SegmentIDs {
		segmentInfo, err := s.meta.GetSegment(id)
		if err != nil {
			resp.Status.Reason = err.Error()
			return resp, nil
		}
		infos[i] = segmentInfo
	}
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	resp.Infos = infos
	return resp, nil
}