server.go 23.7 KB
Newer Older
S
sunby 已提交
1 2
package dataservice

S
sunby 已提交
3 4 5
import (
	"context"
	"fmt"
S
sunby 已提交
6 7
	"path"
	"strconv"
S
sunby 已提交
8
	"sync"
S
sunby 已提交
9 10 11
	"sync/atomic"
	"time"

12 13
	grpcdatanodeclient "github.com/zilliztech/milvus-distributed/internal/distributed/datanode/client"

14 15 16 17
	"go.uber.org/zap"

	"github.com/zilliztech/milvus-distributed/internal/log"

S
sunby 已提交
18 19
	"github.com/golang/protobuf/proto"
	"github.com/zilliztech/milvus-distributed/internal/proto/masterpb"
S
sunby 已提交
20 21 22

	"github.com/zilliztech/milvus-distributed/internal/msgstream"

S
sunby 已提交
23 24 25 26 27 28 29 30 31 32 33 34 35
	"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"

	"github.com/zilliztech/milvus-distributed/internal/timesync"

	etcdkv "github.com/zilliztech/milvus-distributed/internal/kv/etcd"
	"go.etcd.io/etcd/clientv3"

	"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
	"github.com/zilliztech/milvus-distributed/internal/proto/datapb"
	"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
	"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
)

S
sunby 已提交
36 37
const role = "dataservice"

S
sunby 已提交
38 39
type DataService interface {
	typeutil.Service
N
neza2017 已提交
40
	typeutil.Component
S
sunby 已提交
41 42 43 44 45 46 47
	RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
	Flush(req *datapb.FlushRequest) (*commonpb.Status, error)

	AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error)
	ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error)
	GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error)
	GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error)
G
godchen 已提交
48 49
	GetSegmentInfoChannel() (*milvuspb.StringResponse, error)
	GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error)
S
sunby 已提交
50 51 52
	GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error)
	GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error)
	GetComponentStates() (*internalpb2.ComponentStates, error)
53
	GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error)
X
XuanYang-cn 已提交
54
	GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error)
S
sunby 已提交
55 56
}

S
sunby 已提交
57 58 59 60 61 62 63 64 65 66
type MasterClient interface {
	ShowCollections(in *milvuspb.ShowCollectionRequest) (*milvuspb.ShowCollectionResponse, error)
	DescribeCollection(in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error)
	ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error)
	GetDdChannel() (string, error)
	AllocTimestamp(in *masterpb.TsoRequest) (*masterpb.TsoResponse, error)
	AllocID(in *masterpb.IDRequest) (*masterpb.IDResponse, error)
	GetComponentStates() (*internalpb2.ComponentStates, error)
}

S
sunby 已提交
67 68 69 70 71 72 73
type DataNodeClient interface {
	WatchDmChannels(in *datapb.WatchDmChannelRequest) (*commonpb.Status, error)
	GetComponentStates(empty *commonpb.Empty) (*internalpb2.ComponentStates, error)
	FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error)
	Stop() error
}

S
sunby 已提交
74
type (
S
sunby 已提交
75 76 77
	UniqueID  = typeutil.UniqueID
	Timestamp = typeutil.Timestamp
	Server    struct {
N
neza2017 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
		ctx               context.Context
		serverLoopCtx     context.Context
		serverLoopCancel  context.CancelFunc
		serverLoopWg      sync.WaitGroup
		state             atomic.Value
		client            *etcdkv.EtcdKV
		meta              *meta
		segAllocator      segmentAllocator
		statsHandler      *statsHandler
		ddHandler         *ddHandler
		allocator         allocator
		cluster           *dataNodeCluster
		msgProducer       *timesync.MsgProducer
		registerFinishCh  chan struct{}
		masterClient      MasterClient
		ttMsgStream       msgstream.MsgStream
		k2sMsgStream      msgstream.MsgStream
		ddChannelName     string
		segmentInfoStream msgstream.MsgStream
97
		insertChannels    []string
G
groot 已提交
98
		msFactory         msgstream.Factory
C
cai.zhang 已提交
99
		ttBarrier         timesync.TimeTickBarrier
S
sunby 已提交
100 101 102
	}
)

G
groot 已提交
103
func CreateServer(ctx context.Context, factory msgstream.Factory) (*Server, error) {
S
sunby 已提交
104
	ch := make(chan struct{})
S
sunby 已提交
105
	s := &Server{
S
sunby 已提交
106
		ctx:              ctx,
S
sunby 已提交
107 108
		registerFinishCh: ch,
		cluster:          newDataNodeCluster(ch),
G
groot 已提交
109
		msFactory:        factory,
S
sunby 已提交
110
	}
111
	s.insertChannels = s.getInsertChannels()
112
	s.UpdateStateCode(internalpb2.StateCode_ABNORMAL)
S
sunby 已提交
113 114 115
	return s, nil
}

116 117 118 119 120 121 122 123 124
func (s *Server) getInsertChannels() []string {
	channels := make([]string, Params.InsertChannelNum)
	var i int64 = 0
	for ; i < Params.InsertChannelNum; i++ {
		channels[i] = Params.InsertChannelPrefixName + strconv.FormatInt(i, 10)
	}
	return channels
}

S
sunby 已提交
125 126
func (s *Server) SetMasterClient(masterClient MasterClient) {
	s.masterClient = masterClient
S
sunby 已提交
127 128 129
}

func (s *Server) Init() error {
S
sunby 已提交
130 131 132 133
	return nil
}

func (s *Server) Start() error {
134
	var err error
G
groot 已提交
135 136 137 138 139 140 141 142 143
	m := map[string]interface{}{
		"PulsarAddress":  Params.PulsarAddress,
		"ReceiveBufSize": 1024,
		"PulsarBufSize":  1024}
	err = s.msFactory.SetParams(m)
	if err != nil {
		return err
	}

S
sunby 已提交
144
	s.allocator = newAllocatorImpl(s.masterClient)
145
	if err = s.initMeta(); err != nil {
S
sunby 已提交
146 147 148
		return err
	}
	s.statsHandler = newStatsHandler(s.meta)
Y
yukun 已提交
149
	s.segAllocator = newSegmentAllocator(s.meta, s.allocator)
N
neza2017 已提交
150
	s.ddHandler = newDDHandler(s.meta, s.segAllocator)
151 152
	s.initSegmentInfoChannel()
	if err = s.loadMetaFromMaster(); err != nil {
S
sunby 已提交
153 154
		return err
	}
155
	s.waitDataNodeRegister()
156
	s.cluster.WatchInsertChannels(s.insertChannels)
157 158 159
	if err = s.initMsgProducer(); err != nil {
		return err
	}
S
sunby 已提交
160
	s.startServerLoop()
161
	s.UpdateStateCode(internalpb2.StateCode_HEALTHY)
162
	log.Debug("start success")
S
sunby 已提交
163 164 165
	return nil
}

166 167 168 169
func (s *Server) UpdateStateCode(code internalpb2.StateCode) {
	s.state.Store(code)
}

S
sunby 已提交
170 171 172 173
func (s *Server) checkStateIsHealthy() bool {
	return s.state.Load().(internalpb2.StateCode) == internalpb2.StateCode_HEALTHY
}

S
sunby 已提交
174 175 176 177 178 179 180
func (s *Server) initMeta() error {
	etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}})
	if err != nil {
		return err
	}
	etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
	s.client = etcdKV
N
neza2017 已提交
181
	s.meta, err = newMeta(etcdKV)
S
sunby 已提交
182 183 184 185 186 187
	if err != nil {
		return err
	}
	return nil
}

188
func (s *Server) initSegmentInfoChannel() {
G
groot 已提交
189
	segmentInfoStream, _ := s.msFactory.NewMsgStream(s.ctx)
Z
zhenshan.cao 已提交
190
	segmentInfoStream.AsProducer([]string{Params.SegmentInfoChannelName})
191 192
	s.segmentInfoStream = segmentInfoStream
	s.segmentInfoStream.Start()
S
sunby 已提交
193
}
S
sunby 已提交
194
func (s *Server) initMsgProducer() error {
C
cai.zhang 已提交
195
	var err error
196
	if s.ttMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil {
C
cai.zhang 已提交
197 198 199
		return err
	}
	s.ttMsgStream.AsConsumer([]string{Params.TimeTickChannelName}, Params.DataServiceSubscriptionName)
S
sunby 已提交
200
	s.ttMsgStream.Start()
S
sunby 已提交
201 202
	s.ttBarrier = timesync.NewHardTimeTickBarrier(s.ctx, s.ttMsgStream, s.cluster.GetNodeIDs())
	s.ttBarrier.Start()
G
groot 已提交
203
	if s.k2sMsgStream, err = s.msFactory.NewMsgStream(s.ctx); err != nil {
C
cai.zhang 已提交
204 205 206
		return err
	}
	s.k2sMsgStream.AsProducer(Params.K2SChannelNames)
207
	s.k2sMsgStream.Start()
C
cai.zhang 已提交
208
	dataNodeTTWatcher := newDataNodeTimeTickWatcher(s.meta, s.segAllocator, s.cluster)
209
	k2sMsgWatcher := timesync.NewMsgTimeTickWatcher(s.k2sMsgStream)
C
cai.zhang 已提交
210
	if s.msgProducer, err = timesync.NewTimeSyncMsgProducer(s.ttBarrier, dataNodeTTWatcher, k2sMsgWatcher); err != nil {
S
sunby 已提交
211 212 213 214 215
		return err
	}
	s.msgProducer.Start(s.ctx)
	return nil
}
S
sunby 已提交
216

S
sunby 已提交
217
func (s *Server) loadMetaFromMaster() error {
218
	log.Debug("loading collection meta from master")
S
sunby 已提交
219 220
	var err error
	if err = s.checkMasterIsHealthy(); err != nil {
S
sunby 已提交
221 222
		return err
	}
N
neza2017 已提交
223 224 225 226 227 228 229
	if s.ddChannelName == "" {
		channel, err := s.masterClient.GetDdChannel()
		if err != nil {
			return err
		}
		s.ddChannelName = channel
	}
S
sunby 已提交
230 231 232 233 234
	collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{
		Base: &commonpb.MsgBase{
			MsgType:   commonpb.MsgType_kShowCollections,
			MsgID:     -1, // todo add msg id
			Timestamp: 0,  // todo
S
sunby 已提交
235
			SourceID:  Params.NodeID,
S
sunby 已提交
236 237 238
		},
		DbName: "",
	})
S
sunby 已提交
239
	if err = VerifyResponse(collections, err); err != nil {
S
sunby 已提交
240 241 242 243 244 245 246 247
		return err
	}
	for _, collectionName := range collections.CollectionNames {
		collection, err := s.masterClient.DescribeCollection(&milvuspb.DescribeCollectionRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kDescribeCollection,
				MsgID:     -1, // todo
				Timestamp: 0,  // todo
S
sunby 已提交
248
				SourceID:  Params.NodeID,
S
sunby 已提交
249 250 251 252
			},
			DbName:         "",
			CollectionName: collectionName,
		})
S
sunby 已提交
253 254
		if err = VerifyResponse(collection, err); err != nil {
			log.Error("describe collection error", zap.String("collectionName", collectionName), zap.Error(err))
S
sunby 已提交
255 256 257 258 259 260 261
			continue
		}
		partitions, err := s.masterClient.ShowPartitions(&milvuspb.ShowPartitionRequest{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kShowPartitions,
				MsgID:     -1, // todo
				Timestamp: 0,  // todo
S
sunby 已提交
262
				SourceID:  Params.NodeID,
S
sunby 已提交
263 264 265 266 267
			},
			DbName:         "",
			CollectionName: collectionName,
			CollectionID:   collection.CollectionID,
		})
S
sunby 已提交
268 269
		if err = VerifyResponse(partitions, err); err != nil {
			log.Error("show partitions error", zap.String("collectionName", collectionName), zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
S
sunby 已提交
270 271 272 273 274
			continue
		}
		err = s.meta.AddCollection(&collectionInfo{
			ID:         collection.CollectionID,
			Schema:     collection.Schema,
S
sunby 已提交
275
			Partitions: partitions.PartitionIDs,
S
sunby 已提交
276 277
		})
		if err != nil {
S
sunby 已提交
278
			log.Error("add collection to meta error", zap.Int64("collectionID", collection.CollectionID), zap.Error(err))
S
sunby 已提交
279 280 281
			continue
		}
	}
282
	log.Debug("load collection meta from master complete")
S
sunby 已提交
283
	return nil
S
sunby 已提交
284
}
S
sunby 已提交
285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300

func (s *Server) checkMasterIsHealthy() error {
	ticker := time.NewTicker(300 * time.Millisecond)
	ctx, cancel := context.WithTimeout(s.ctx, 30*time.Second)
	defer func() {
		ticker.Stop()
		cancel()
	}()
	for {
		var resp *internalpb2.ComponentStates
		var err error
		select {
		case <-ctx.Done():
			return fmt.Errorf("master is not healthy")
		case <-ticker.C:
			resp, err = s.masterClient.GetComponentStates()
S
sunby 已提交
301
			if err = VerifyResponse(resp, err); err != nil {
S
sunby 已提交
302 303 304 305 306 307 308 309 310 311
				return err
			}
		}
		if resp.State.StateCode == internalpb2.StateCode_HEALTHY {
			break
		}
	}
	return nil
}

312 313
func (s *Server) startServerLoop() {
	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
S
sunby 已提交
314
	s.serverLoopWg.Add(3)
315 316
	go s.startStatsChannel(s.serverLoopCtx)
	go s.startSegmentFlushChannel(s.serverLoopCtx)
N
neza2017 已提交
317
	go s.startDDChannel(s.serverLoopCtx)
318 319 320 321
}

func (s *Server) startStatsChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
322
	statsStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
323
	statsStream.AsConsumer([]string{Params.StatisticsChannelName}, Params.DataServiceSubscriptionName)
324 325 326 327 328 329 330 331 332 333
	statsStream.Start()
	defer statsStream.Close()
	for {
		select {
		case <-ctx.Done():
			return
		default:
		}
		msgPack := statsStream.Consume()
		for _, msg := range msgPack.Msgs {
S
sunby 已提交
334 335 336 337
			statistics, ok := msg.(*msgstream.SegmentStatisticsMsg)
			if !ok {
				log.Error("receive unknown type msg from stats channel", zap.Stringer("msgType", msg.Type()))
			}
338 339
			for _, stat := range statistics.SegStats {
				if err := s.statsHandler.HandleSegmentStat(stat); err != nil {
S
sunby 已提交
340
					log.Error("handle segment stat error", zap.Int64("segmentID", stat.SegmentID), zap.Error(err))
341 342 343 344 345 346 347 348 349
					continue
				}
			}
		}
	}
}

func (s *Server) startSegmentFlushChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
350
	flushStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
351
	flushStream.AsConsumer([]string{Params.SegmentInfoChannelName}, Params.DataServiceSubscriptionName)
352 353 354 355 356
	flushStream.Start()
	defer flushStream.Close()
	for {
		select {
		case <-ctx.Done():
357
			log.Debug("segment flush channel shut down")
358 359 360 361 362 363 364 365 366 367 368 369
			return
		default:
		}
		msgPack := flushStream.Consume()
		for _, msg := range msgPack.Msgs {
			if msg.Type() != commonpb.MsgType_kSegmentFlushDone {
				continue
			}
			realMsg := msg.(*msgstream.FlushCompletedMsg)

			segmentInfo, err := s.meta.GetSegment(realMsg.SegmentID)
			if err != nil {
S
sunby 已提交
370
				log.Error("get segment from meta error", zap.Int64("segmentID", realMsg.SegmentID), zap.Error(err))
371 372 373
				continue
			}
			segmentInfo.FlushedTime = realMsg.BeginTimestamp
S
sunby 已提交
374
			segmentInfo.State = commonpb.SegmentState_SegmentFlushed
375
			if err = s.meta.UpdateSegment(segmentInfo); err != nil {
376
				log.Error("update segment error", zap.Error(err))
377 378 379 380 381 382
				continue
			}
		}
	}
}

N
neza2017 已提交
383 384
func (s *Server) startDDChannel(ctx context.Context) {
	defer s.serverLoopWg.Done()
G
groot 已提交
385
	ddStream, _ := s.msFactory.NewMsgStream(ctx)
Z
zhenshan.cao 已提交
386
	ddStream.AsConsumer([]string{s.ddChannelName}, Params.DataServiceSubscriptionName)
N
neza2017 已提交
387 388 389 390 391
	ddStream.Start()
	defer ddStream.Close()
	for {
		select {
		case <-ctx.Done():
392
			log.Debug("dd channel shut down")
N
neza2017 已提交
393 394 395 396 397 398
			return
		default:
		}
		msgPack := ddStream.Consume()
		for _, msg := range msgPack.Msgs {
			if err := s.ddHandler.HandleDDMsg(msg); err != nil {
399
				log.Error("handle dd msg error", zap.Error(err))
N
neza2017 已提交
400 401 402 403 404 405
				continue
			}
		}
	}
}

406
func (s *Server) waitDataNodeRegister() {
407
	log.Debug("waiting data node to register")
408
	<-s.registerFinishCh
409
	log.Debug("all data nodes register")
410
}
S
sunby 已提交
411 412

func (s *Server) Stop() error {
S
sunby 已提交
413
	s.cluster.ShutDownClients()
S
sunby 已提交
414
	s.ttBarrier.Close()
S
sunby 已提交
415
	s.ttMsgStream.Close()
416
	s.k2sMsgStream.Close()
S
sunby 已提交
417
	s.msgProducer.Close()
N
neza2017 已提交
418
	s.segmentInfoStream.Close()
S
sunby 已提交
419
	s.stopServerLoop()
S
sunby 已提交
420 421 422
	return nil
}

S
sunby 已提交
423 424 425 426 427
func (s *Server) stopServerLoop() {
	s.serverLoopCancel()
	s.serverLoopWg.Wait()
}

S
sunby 已提交
428
func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) {
S
sunby 已提交
429 430 431 432
	resp := &internalpb2.ComponentStates{
		State: &internalpb2.ComponentInfo{
			NodeID:    Params.NodeID,
			Role:      role,
S
sunby 已提交
433
			StateCode: s.state.Load().(internalpb2.StateCode),
S
sunby 已提交
434 435 436 437 438 439 440 441 442 443 444 445 446
		},
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	dataNodeStates, err := s.cluster.GetDataNodeStates()
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.SubcomponentStates = dataNodeStates
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	return resp, nil
S
sunby 已提交
447 448
}

G
godchen 已提交
449 450 451 452 453 454 455
func (s *Server) GetTimeTickChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.TimeTickChannelName,
	}, nil
S
sunby 已提交
456 457
}

G
godchen 已提交
458 459 460 461 462 463 464
func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.StatisticsChannelName,
	}, nil
S
sunby 已提交
465 466 467
}

func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
S
sunby 已提交
468
	ret := &datapb.RegisterNodeResponse{
S
sunby 已提交
469
		Status: &commonpb.Status{
S
sunby 已提交
470
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
S
sunby 已提交
471
		},
S
sunby 已提交
472
	}
473
	log.Info("DataService: RegisterNode:", zap.String("IP", req.Address.Ip), zap.Int64("Port", req.Address.Port))
S
sunby 已提交
474 475 476 477
	node, err := s.newDataNode(req.Address.Ip, req.Address.Port, req.Base.SourceID)
	if err != nil {
		return nil, err
	}
478

S
sunby 已提交
479
	s.cluster.Register(node)
480

N
neza2017 已提交
481
	if s.ddChannelName == "" {
N
neza2017 已提交
482
		resp, err := s.masterClient.GetDdChannel()
S
sunby 已提交
483
		if err = VerifyResponse(resp, err); err != nil {
S
sunby 已提交
484 485 486
			ret.Status.Reason = err.Error()
			return ret, err
		}
N
neza2017 已提交
487
		s.ddChannelName = resp
S
sunby 已提交
488 489 490 491 492 493 494 495
	}
	ret.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	ret.InitParams = &internalpb2.InitParams{
		NodeID: Params.NodeID,
		StartParams: []*commonpb.KeyValuePair{
			{Key: "DDChannelName", Value: s.ddChannelName},
			{Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName},
			{Key: "TimeTickChannelName", Value: Params.TimeTickChannelName},
N
neza2017 已提交
496
			{Key: "CompleteFlushChannelName", Value: Params.SegmentInfoChannelName},
S
sunby 已提交
497 498 499
		},
	}
	return ret, nil
S
sunby 已提交
500 501
}

S
sunby 已提交
502
func (s *Server) newDataNode(ip string, port int64, id UniqueID) (*dataNode, error) {
503
	client := grpcdatanodeclient.NewClient(fmt.Sprintf("%s:%d", ip, port))
S
sunby 已提交
504 505 506
	if err := client.Init(); err != nil {
		return nil, err
	}
507

S
sunby 已提交
508 509 510 511 512 513 514 515 516 517 518 519 520 521
	if err := client.Start(); err != nil {
		return nil, err
	}
	return &dataNode{
		id: id,
		address: struct {
			ip   string
			port int64
		}{ip: ip, port: port},
		client:     client,
		channelNum: 0,
	}, nil
}

S
sunby 已提交
522
func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) {
S
sunby 已提交
523 524 525 526 527 528
	if !s.checkStateIsHealthy() {
		return &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			Reason:    "server is initializing",
		}, nil
	}
N
neza2017 已提交
529
	s.segAllocator.SealAllSegments(req.CollectionID)
S
sunby 已提交
530 531 532
	return &commonpb.Status{
		ErrorCode: commonpb.ErrorCode_SUCCESS,
	}, nil
S
sunby 已提交
533 534 535 536 537 538 539 540 541
}

func (s *Server) AssignSegmentID(req *datapb.AssignSegIDRequest) (*datapb.AssignSegIDResponse, error) {
	resp := &datapb.AssignSegIDResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		SegIDAssignments: make([]*datapb.SegIDAssignment, 0),
	}
S
sunby 已提交
542 543 544 545 546
	if !s.checkStateIsHealthy() {
		resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
		resp.Status.Reason = "server is initializing"
		return resp, nil
	}
S
sunby 已提交
547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590
	for _, r := range req.SegIDRequests {
		result := &datapb.SegIDAssignment{
			Status: &commonpb.Status{
				ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
			},
		}
		segmentID, retCount, expireTs, err := s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
		if err != nil {
			if _, ok := err.(errRemainInSufficient); !ok {
				result.Status.Reason = fmt.Sprintf("allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}

			if err = s.openNewSegment(r.CollectionID, r.PartitionID, r.ChannelName); err != nil {
				result.Status.Reason = fmt.Sprintf("open new segment of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}

			segmentID, retCount, expireTs, err = s.segAllocator.AllocSegment(r.CollectionID, r.PartitionID, r.ChannelName, int(r.Count))
			if err != nil {
				result.Status.Reason = fmt.Sprintf("retry allocation of Collection %d, Partition %d, Channel %s, Count %d error:  %s",
					r.CollectionID, r.PartitionID, r.ChannelName, r.Count, err.Error())
				resp.SegIDAssignments = append(resp.SegIDAssignments, result)
				continue
			}
		}

		result.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
		result.CollectionID = r.CollectionID
		result.SegID = segmentID
		result.PartitionID = r.PartitionID
		result.Count = uint32(retCount)
		result.ExpireTime = expireTs
		result.ChannelName = r.ChannelName
		resp.SegIDAssignments = append(resp.SegIDAssignments, result)
	}
	return resp, nil
}

func (s *Server) openNewSegment(collectionID UniqueID, partitionID UniqueID, channelName string) error {
N
neza2017 已提交
591 592 593 594
	id, err := s.allocator.allocID()
	if err != nil {
		return err
	}
S
sunby 已提交
595
	segmentInfo, err := BuildSegment(collectionID, partitionID, id, channelName)
S
sunby 已提交
596 597 598 599 600 601
	if err != nil {
		return err
	}
	if err = s.meta.AddSegment(segmentInfo); err != nil {
		return err
	}
S
sunby 已提交
602
	if err = s.segAllocator.OpenSegment(segmentInfo); err != nil {
S
sunby 已提交
603 604
		return err
	}
605
	infoMsg := &msgstream.SegmentInfoMsg{
S
sunby 已提交
606 607 608
		BaseMsg: msgstream.BaseMsg{
			HashValues: []uint32{0},
		},
609 610 611 612
		SegmentMsg: datapb.SegmentMsg{
			Base: &commonpb.MsgBase{
				MsgType:   commonpb.MsgType_kSegmentInfo,
				MsgID:     0,
N
neza2017 已提交
613 614
				Timestamp: 0,
				SourceID:  Params.NodeID,
615 616 617 618
			},
			Segment: segmentInfo,
		},
	}
G
groot 已提交
619
	msgPack := &msgstream.MsgPack{
620 621
		Msgs: []msgstream.TsMsg{infoMsg},
	}
X
XuanYang-cn 已提交
622
	if err = s.segmentInfoStream.Produce(s.ctx, msgPack); err != nil {
623 624
		return err
	}
S
sunby 已提交
625 626 627 628
	return nil
}

func (s *Server) ShowSegments(req *datapb.ShowSegmentRequest) (*datapb.ShowSegmentResponse, error) {
S
sunby 已提交
629 630 631 632 633
	resp := &datapb.ShowSegmentResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
S
sunby 已提交
634
	if !s.checkStateIsHealthy() {
S
sunby 已提交
635 636
		resp.Status.Reason = "server is initializing"
		return resp, nil
S
sunby 已提交
637
	}
638
	ids := s.meta.GetSegmentsOfPartition(req.CollectionID, req.PartitionID)
S
sunby 已提交
639 640 641
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	resp.SegmentIDs = ids
	return resp, nil
S
sunby 已提交
642 643 644 645 646 647 648 649
}

func (s *Server) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) {
	resp := &datapb.SegmentStatesResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
S
sunby 已提交
650 651 652 653
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "server is initializing"
		return resp, nil
	}
S
sunby 已提交
654

Z
zhenshan.cao 已提交
655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
	for _, segmentID := range req.SegmentIDs {
		state := &datapb.SegmentStateInfo{
			Status:    &commonpb.Status{},
			SegmentID: segmentID,
		}
		segmentInfo, err := s.meta.GetSegment(segmentID)
		if err != nil {
			state.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
			state.Status.Reason = "get segment states error: " + err.Error()
		} else {
			state.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
			state.State = segmentInfo.State
			state.CreateTime = segmentInfo.OpenTime
			state.SealedTime = segmentInfo.SealedTime
			state.FlushedTime = segmentInfo.FlushedTime
X
XuanYang-cn 已提交
670 671
			state.StartPosition = segmentInfo.StartPosition
			state.EndPosition = segmentInfo.EndPosition
Z
zhenshan.cao 已提交
672 673
		}
		resp.States = append(resp.States, state)
S
sunby 已提交
674
	}
S
sunby 已提交
675
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
Z
zhenshan.cao 已提交
676

S
sunby 已提交
677 678 679 680
	return resp, nil
}

func (s *Server) GetInsertBinlogPaths(req *datapb.InsertBinlogPathRequest) (*datapb.InsertBinlogPathsResponse, error) {
S
sunby 已提交
681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699
	resp := &datapb.InsertBinlogPathsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	p := path.Join(Params.SegmentFlushMetaPath, strconv.FormatInt(req.SegmentID, 10))
	value, err := s.client.Load(p)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	flushMeta := &datapb.SegmentFlushMeta{}
	err = proto.UnmarshalText(value, flushMeta)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	fields := make([]UniqueID, len(flushMeta.Fields))
	paths := make([]*internalpb2.StringList, len(flushMeta.Fields))
X
Xiangyu Wang 已提交
700 701 702
	for i, field := range flushMeta.Fields {
		fields[i] = field.FieldID
		paths[i] = &internalpb2.StringList{Values: field.BinlogPaths}
S
sunby 已提交
703
	}
S
sunby 已提交
704
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
S
sunby 已提交
705 706 707
	resp.FieldIDs = fields
	resp.Paths = paths
	return resp, nil
S
sunby 已提交
708 709
}

G
godchen 已提交
710 711 712 713 714 715 716
func (s *Server) GetInsertChannels(req *datapb.InsertChannelRequest) (*internalpb2.StringList, error) {
	return &internalpb2.StringList{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Values: s.insertChannels,
	}, nil
S
sunby 已提交
717 718 719
}

func (s *Server) GetCollectionStatistics(req *datapb.CollectionStatsRequest) (*datapb.CollectionStatsResponse, error) {
720 721 722 723 724 725 726 727 728 729 730
	resp := &datapb.CollectionStatsResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
731
	resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)})
732
	return resp, nil
S
sunby 已提交
733 734 735 736 737
}

func (s *Server) GetPartitionStatistics(req *datapb.PartitionStatsRequest) (*datapb.PartitionStatsResponse, error) {
	// todo implement
	return nil, nil
S
sunby 已提交
738
}
N
neza2017 已提交
739

G
godchen 已提交
740 741 742 743 744 745 746
func (s *Server) GetSegmentInfoChannel() (*milvuspb.StringResponse, error) {
	return &milvuspb.StringResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_SUCCESS,
		},
		Value: Params.SegmentInfoChannelName,
	}, nil
N
neza2017 已提交
747
}
748 749 750 751 752 753 754

func (s *Server) GetCount(req *datapb.CollectionCountRequest) (*datapb.CollectionCountResponse, error) {
	resp := &datapb.CollectionCountResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
X
XuanYang-cn 已提交
755 756 757 758
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "data service is not healthy"
		return resp, nil
	}
759 760 761 762 763 764 765 766 767
	nums, err := s.meta.GetNumRowsOfCollection(req.CollectionID)
	if err != nil {
		resp.Status.Reason = err.Error()
		return resp, nil
	}
	resp.Count = nums
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	return resp, nil
}
X
XuanYang-cn 已提交
768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791

func (s *Server) GetSegmentInfo(req *datapb.SegmentInfoRequest) (*datapb.SegmentInfoResponse, error) {
	resp := &datapb.SegmentInfoResponse{
		Status: &commonpb.Status{
			ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR,
		},
	}
	if !s.checkStateIsHealthy() {
		resp.Status.Reason = "data service is not healthy"
		return resp, nil
	}
	infos := make([]*datapb.SegmentInfo, len(req.SegmentIDs))
	for i, id := range req.SegmentIDs {
		segmentInfo, err := s.meta.GetSegment(id)
		if err != nil {
			resp.Status.Reason = err.Error()
			return resp, nil
		}
		infos[i] = segmentInfo
	}
	resp.Status.ErrorCode = commonpb.ErrorCode_SUCCESS
	resp.Infos = infos
	return resp, nil
}