From 82d6fb18b89146129bdfad56d7b091c3c3188204 Mon Sep 17 00:00:00 2001 From: sunby Date: Sat, 23 Jan 2021 20:22:59 +0800 Subject: [PATCH] Add init params returned to data node Signed-off-by: sunby --- internal/dataservice/meta_test.go | 61 ++++++++++++ internal/dataservice/param.go | 1 + internal/dataservice/server.go | 98 +++++++++++++------ internal/dataservice/stats_handler.go | 25 +---- .../distributed/dataservice/grpc_service.go | 33 ++++++- 5 files changed, 161 insertions(+), 57 deletions(-) diff --git a/internal/dataservice/meta_test.go b/internal/dataservice/meta_test.go index d306b06f6..2474763ae 100644 --- a/internal/dataservice/meta_test.go +++ b/internal/dataservice/meta_test.go @@ -39,3 +39,64 @@ func TestCollection(t *testing.T) { _, err = meta.GetCollection(id) assert.NotNil(t, err) } + +func TestSegment(t *testing.T) { + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + id, err := mockAllocator.allocID() + assert.Nil(t, err) + segmentInfo, err := meta.BuildSegment(id, 100, []string{"c1", "c2"}) + assert.Nil(t, err) + err = meta.AddSegment(segmentInfo) + assert.Nil(t, err) + info, err := meta.GetSegment(segmentInfo.SegmentID) + assert.Nil(t, err) + assert.EqualValues(t, segmentInfo, info) + ids := meta.GetSegmentsByCollectionID(id) + assert.EqualValues(t, 1, len(ids)) + assert.EqualValues(t, segmentInfo.SegmentID, ids[0]) + ids = meta.GetSegmentsByCollectionAndPartitionID(id, 100) + assert.EqualValues(t, 1, len(ids)) + assert.EqualValues(t, segmentInfo.SegmentID, ids[0]) + err = meta.SealSegment(segmentInfo.SegmentID) + assert.Nil(t, err) + err = meta.FlushSegment(segmentInfo.SegmentID) + assert.Nil(t, err) + info, err = meta.GetSegment(segmentInfo.SegmentID) + assert.Nil(t, err) + assert.NotZero(t, info.SealedTime) + assert.NotZero(t, info.FlushedTime) +} + +func TestPartition(t *testing.T) { + mockAllocator := newMockAllocator() + meta, err := newMemoryMeta(mockAllocator) + assert.Nil(t, err) + testSchema := newTestSchema() + id, err := mockAllocator.allocID() + assert.Nil(t, err) + + err = meta.AddPartition(id, 10) + assert.NotNil(t, err) + err = meta.AddCollection(&collectionInfo{ + ID: id, + Schema: testSchema, + Partitions: []UniqueID{}, + }) + assert.Nil(t, err) + err = meta.AddPartition(id, 10) + assert.Nil(t, err) + err = meta.AddPartition(id, 10) + assert.NotNil(t, err) + collection, err := meta.GetCollection(id) + assert.Nil(t, err) + assert.EqualValues(t, 10, collection.Partitions[0]) + err = meta.DropPartition(id, 10) + assert.Nil(t, err) + collection, err = meta.GetCollection(id) + assert.Nil(t, err) + assert.EqualValues(t, 0, len(collection.Partitions)) + err = meta.DropPartition(id, 10) + assert.NotNil(t, err) +} diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index b697406c7..5d96614c4 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -29,6 +29,7 @@ type ParamTable struct { StatisticsChannelName string TimeTickChannelName string DataNodeNum int + SegmentChannelName string // todo init } var Params ParamTable diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index b12f3112c..1fbac260e 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "log" - "time" + "sync" "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/msgstream/pulsarms" @@ -49,6 +49,9 @@ type ( Timestamp = typeutil.Timestamp Server struct { ctx context.Context + serverLoopCtx context.Context + serverLoopCancel context.CancelFunc + serverLoopWg sync.WaitGroup state internalpb2.StateCode client *etcdkv.EtcdKV meta *meta @@ -61,10 +64,11 @@ type ( registerFinishCh chan struct{} masterClient *masterservice.GrpcClient ttMsgStream msgstream.MsgStream + ddChannelName string } ) -func CreateServer(ctx context.Context) (*Server, error) { +func CreateServer(ctx context.Context, client *masterservice.GrpcClient) (*Server, error) { ch := make(chan struct{}) return &Server{ ctx: ctx, @@ -72,6 +76,7 @@ func CreateServer(ctx context.Context) (*Server, error) { insertChannelMgr: newInsertChannelManager(), registerFinishCh: ch, cluster: newDataNodeCluster(ch), + masterClient: client, }, nil } @@ -81,9 +86,6 @@ func (s *Server) Init() error { } func (s *Server) Start() error { - if err := s.connectMaster(); err != nil { - return err - } s.allocator = newAllocatorImpl(s.masterClient) if err := s.initMeta(); err != nil { return err @@ -95,34 +97,20 @@ func (s *Server) Start() error { } s.segAllocator = segAllocator s.waitDataNodeRegister() + if err = s.loadMetaFromMaster(); err != nil { return err } if err = s.initMsgProducer(); err != nil { return err } + + s.startServerLoop() s.state = internalpb2.StateCode_HEALTHY log.Println("start success") return nil } -func (s *Server) connectMaster() error { - log.Println("connecting to master") - master, err := masterservice.NewGrpcClient(Params.MasterAddress, 30*time.Second) - if err != nil { - return err - } - if err = master.Init(nil); err != nil { - return err - } - if err = master.Start(); err != nil { - return err - } - s.masterClient = master - log.Println("connect to master success") - return nil -} - func (s *Server) initMeta() error { etcdClient, err := clientv3.New(clientv3.Config{Endpoints: []string{Params.EtcdAddress}}) if err != nil { @@ -144,7 +132,6 @@ func (s *Server) waitDataNodeRegister() { } func (s *Server) initMsgProducer() error { - // todo ttstream and peerids s.ttMsgStream = pulsarms.NewPulsarTtMsgStream(s.ctx, 1024) s.ttMsgStream.Start() timeTickBarrier := timesync.NewHardTimeTickBarrier(s.ttMsgStream, s.cluster.GetNodeIDs()) @@ -157,6 +144,37 @@ func (s *Server) initMsgProducer() error { s.msgProducer.Start(s.ctx) return nil } + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.startStatsChannel(s.serverLoopCtx) +} + +func (s *Server) startStatsChannel(ctx context.Context) { + defer s.serverLoopWg.Done() + statsStream := pulsarms.NewPulsarMsgStream(ctx, 1024) + statsStream.Start() + defer statsStream.Close() + for { + select { + case <-ctx.Done(): + return + default: + } + msgPack := statsStream.Consume() + for _, msg := range msgPack.Msgs { + statistics := msg.(*msgstream.SegmentStatisticsMsg) + for _, stat := range statistics.SegStats { + if err := s.statsHandler.HandleSegmentStat(stat); err != nil { + log.Println(err.Error()) + continue + } + } + } + } +} + func (s *Server) loadMetaFromMaster() error { log.Println("loading collection meta from master") collections, err := s.masterClient.ShowCollections(&milvuspb.ShowCollectionRequest{ @@ -218,9 +236,15 @@ func (s *Server) loadMetaFromMaster() error { func (s *Server) Stop() error { s.ttMsgStream.Close() s.msgProducer.Close() + s.stopServerLoop() return nil } +func (s *Server) stopServerLoop() { + s.serverLoopCancel() + s.serverLoopWg.Wait() +} + func (s *Server) GetComponentStates() (*internalpb2.ComponentStates, error) { resp := &internalpb2.ComponentStates{ State: &internalpb2.ComponentInfo{ @@ -261,13 +285,31 @@ func (s *Server) GetStatisticsChannel() (*milvuspb.StringResponse, error) { } func (s *Server) RegisterNode(req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) { - s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID) - // add init params - return &datapb.RegisterNodeResponse{ + ret := &datapb.RegisterNodeResponse{ Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, + ErrorCode: commonpb.ErrorCode_UNEXPECTED_ERROR, }, - }, nil + } + s.cluster.Register(req.Address.Ip, req.Address.Port, req.Base.SourceID) + if len(s.ddChannelName) == 0 { + resp, err := s.masterClient.GetDdChannel(nil) + if err != nil { + ret.Status.Reason = err.Error() + return ret, err + } + s.ddChannelName = resp.Value + } + ret.Status.ErrorCode = commonpb.ErrorCode_SUCCESS + ret.InitParams = &internalpb2.InitParams{ + NodeID: Params.NodeID, + StartParams: []*commonpb.KeyValuePair{ + {Key: "DDChannelName", Value: s.ddChannelName}, + {Key: "SegmentStatisticsChannelName", Value: Params.StatisticsChannelName}, + {Key: "TimeTickChannelName", Value: Params.TimeTickChannelName}, + {Key: "CompleteFlushChannelName", Value: Params.SegmentChannelName}, + }, + } + return ret, nil } func (s *Server) Flush(req *datapb.FlushRequest) (*commonpb.Status, error) { diff --git a/internal/dataservice/stats_handler.go b/internal/dataservice/stats_handler.go index 93bebd791..387422f07 100644 --- a/internal/dataservice/stats_handler.go +++ b/internal/dataservice/stats_handler.go @@ -1,8 +1,6 @@ package dataservice import ( - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/msgstream" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" ) @@ -16,28 +14,7 @@ func newStatsHandler(meta *meta) *statsHandler { } } -func (handler *statsHandler) HandleQueryNodeStats(msgPack *msgstream.MsgPack) error { - for _, msg := range msgPack.Msgs { - statsMsg, ok := msg.(*msgstream.QueryNodeStatsMsg) - if !ok { - return errors.Errorf("Type of message is not QueryNodeSegStatsMsg") - } - - for _, segStat := range statsMsg.GetSegStats() { - if err := handler.handleSegmentStat(segStat); err != nil { - return err - } - } - } - - return nil -} - -func (handler *statsHandler) handleSegmentStat(segStats *internalpb2.SegmentStats) error { - if !segStats.GetRecentlyModified() { - return nil - } - +func (handler *statsHandler) HandleSegmentStat(segStats *internalpb2.SegmentStatisticsUpdates) error { segMeta, err := handler.meta.GetSegment(segStats.SegmentID) if err != nil { return err diff --git a/internal/distributed/dataservice/grpc_service.go b/internal/distributed/dataservice/grpc_service.go index 10e287cb6..c8d331b58 100644 --- a/internal/distributed/dataservice/grpc_service.go +++ b/internal/distributed/dataservice/grpc_service.go @@ -4,6 +4,9 @@ import ( "context" "log" "net" + "time" + + "github.com/zilliztech/milvus-distributed/internal/distributed/masterservice" "google.golang.org/grpc" @@ -17,17 +20,21 @@ import ( ) type Service struct { - server *dataservice.Server - ctx context.Context - cancel context.CancelFunc - grpcServer *grpc.Server + server *dataservice.Server + ctx context.Context + cancel context.CancelFunc + grpcServer *grpc.Server + masterClient *masterservice.GrpcClient } func NewGrpcService() { s := &Service{} var err error s.ctx, s.cancel = context.WithCancel(context.Background()) - s.server, err = dataservice.CreateServer(s.ctx) + if err = s.connectMaster(); err != nil { + log.Fatal("connect to master" + err.Error()) + } + s.server, err = dataservice.CreateServer(s.ctx, s.masterClient) if err != nil { log.Fatalf("create server error: %s", err.Error()) return @@ -45,6 +52,22 @@ func NewGrpcService() { } } +func (s *Service) connectMaster() error { + log.Println("connecting to master") + master, err := masterservice.NewGrpcClient("localhost:10101", 30*time.Second) // todo address + if err != nil { + return err + } + if err = master.Init(nil); err != nil { + return err + } + if err = master.Start(); err != nil { + return err + } + s.masterClient = master + log.Println("connect to master success") + return nil +} func (s *Service) Init() error { return s.server.Init() } -- GitLab