From 947976dbecfaf86b4a9538399f3b3b7999d27b4f Mon Sep 17 00:00:00 2001 From: xige-16 Date: Wed, 27 Jan 2021 11:34:16 +0800 Subject: [PATCH] =?UTF-8?q?Add=20test=20for=20query=20service=E2=80=99s=20?= =?UTF-8?q?load=20function?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: xige-16 --- cmd/datanode/main.go | 16 +- internal/datanode/meta_service.go | 1 + internal/dataservice/param.go | 2 +- internal/dataservice/server.go | 6 +- internal/distributed/datanode/service.go | 4 +- internal/queryservice/queryservice.go | 33 ++-- internal/queryservice/queryservice_test.go | 169 +++++++++++++++++++++ 7 files changed, 195 insertions(+), 36 deletions(-) diff --git a/cmd/datanode/main.go b/cmd/datanode/main.go index 74c7e3f75..a71c54f51 100644 --- a/cmd/datanode/main.go +++ b/cmd/datanode/main.go @@ -23,7 +23,6 @@ const interval = 200 func main() { ctx, cancel := context.WithCancel(context.Background()) - defer cancel() svr, err := dnc.New(ctx) if err != nil { @@ -114,6 +113,11 @@ func main() { panic(err) } + if err := svr.Start(); err != nil { + panic(err) + } + log.Println("Data node successfully started ...") + sc := make(chan os.Signal, 1) signal.Notify(sc, syscall.SIGHUP, @@ -127,15 +131,13 @@ func main() { cancel() }() - if err := svr.Start(); err != nil { - panic(err) - } - log.Println("Data node successfully started ...") - <-ctx.Done() log.Println("Got signal to exit signal:", sig.String()) - svr.Stop() + if err := svr.Stop(); err != nil { + panic(err) + } + switch sig { case syscall.SIGTERM: exit(0) diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index c259af622..182b7ec7e 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -67,6 +67,7 @@ func (mService *metaService) getCollectionNames() ([]string, error) { } func (mService *metaService) createCollection(name string) error { + log.Println("Describing collections") req := &milvuspb.DescribeCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_kDescribeCollection, diff --git a/internal/dataservice/param.go b/internal/dataservice/param.go index 9a574924a..345595498 100644 --- a/internal/dataservice/param.go +++ b/internal/dataservice/param.go @@ -182,7 +182,7 @@ func (p *ParamTable) initSegmentInfoChannelName() { func (p *ParamTable) initDataServiceSubscriptionName() { var err error - p.DataServiceSubscriptionName, err = p.Load("msgChannel.chanNamePrefix.dataServiceSubNamePrefix") + p.DataServiceSubscriptionName, err = p.Load("msgChannel.subNamePrefix.dataServiceSubNamePrefix") if err != nil { panic(err) } diff --git a/internal/dataservice/server.go b/internal/dataservice/server.go index 4844b8411..056aaf773 100644 --- a/internal/dataservice/server.go +++ b/internal/dataservice/server.go @@ -122,14 +122,14 @@ func (s *Server) Start() error { } s.ddHandler = newDDHandler(s.meta, s.segAllocator) s.initSegmentInfoChannel() - if err = s.initMsgProducer(); err != nil { - return err - } if err = s.loadMetaFromMaster(); err != nil { return err } s.startServerLoop() s.waitDataNodeRegister() + if err = s.initMsgProducer(); err != nil { + return err + } s.state.Store(internalpb2.StateCode_HEALTHY) log.Println("start success") return nil diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 73dcd0755..658cd2bf1 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -27,7 +27,9 @@ type Server struct { } func New(ctx context.Context) (*Server, error) { - var s = &Server{} + var s = &Server{ + ctx: ctx, + } s.core = dn.NewDataNode(s.ctx) s.grpcServer = grpc.NewServer() diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 25cb8a386..942407fcb 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -77,26 +77,6 @@ func (qs *QueryService) Stop() error { return nil } -//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error { -// for _, v := range qs.queryNodeClient { -// err := v.SetDataService(d) -// if err != nil { -// return err -// } -// } -// return nil -//} -// -//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error { -// for _, v := range qs.queryNodeClient { -// err := v.SetIndexService(i) -// if err != nil { -// return err -// } -// } -// return nil -//} - func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, @@ -134,6 +114,7 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { // TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) + // TODO:: add mutex allocatedID := qs.numRegisterNode qs.numRegisterNode++ @@ -158,6 +139,7 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb } qs.queryNodes = append(qs.queryNodes, node) + // TODO:: watch dm channels return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -289,7 +271,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm segmentIDs := showSegmentResponse.SegmentIDs segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) channel2id := make(map[string]int) - id2channels := make(map[int][]string) + //id2channels := make(map[int][]string) id2segs := make(map[int][]UniqueID) offset := 0 @@ -306,13 +288,16 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm for i, str := range state.StartPositions { flatChannelName += str.ChannelName channelNames = append(channelNames, str.ChannelName) - if i < len(state.StartPositions) { + if i+1 < len(state.StartPositions) { flatChannelName += "/" } } + if flatChannelName == "" { + log.Fatal("segmentState's channel name is empty") + } if _, ok := channel2id[flatChannelName]; !ok { channel2id[flatChannelName] = offset - id2channels[offset] = channelNames + //id2channels[offset] = channelNames id2segs[offset] = make([]UniqueID, 0) id2segs[offset] = append(id2segs[offset], segmentID) offset++ @@ -329,7 +314,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if segmentStates[v].State == datapb.SegmentState_SegmentFlushed { selectedSegs = append(selectedSegs, v) } else { - if i > 0 && segmentStates[v-1].State != datapb.SegmentState_SegmentFlushed { + if i > 0 && segmentStates[selectedSegs[i-1]].State != datapb.SegmentState_SegmentFlushed { break } selectedSegs = append(selectedSegs, v) diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index 49038ff7a..d6319cf5c 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -2,11 +2,148 @@ package queryservice import ( "context" + "strconv" "testing" "github.com/stretchr/testify/assert" + + "github.com/zilliztech/milvus-distributed/internal/errors" + "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" + "github.com/zilliztech/milvus-distributed/internal/proto/datapb" + "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" + "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) +type masterMock struct { + collectionIDs []UniqueID + col2partition map[UniqueID][]UniqueID + partition2segment map[UniqueID][]UniqueID +} + +func newMasterMock() *masterMock { + collectionIDs := make([]UniqueID, 0) + collectionIDs = append(collectionIDs, 1) + + col2partition := make(map[UniqueID][]UniqueID) + partitionIDs := make([]UniqueID, 0) + partitionIDs = append(partitionIDs, 1) + col2partition[1] = partitionIDs + + partition2segment := make(map[UniqueID][]UniqueID) + segmentIDs := make([]UniqueID, 0) + segmentIDs = append(segmentIDs, 1) + segmentIDs = append(segmentIDs, 2) + segmentIDs = append(segmentIDs, 3) + segmentIDs = append(segmentIDs, 4) + segmentIDs = append(segmentIDs, 5) + segmentIDs = append(segmentIDs, 6) + partition2segment[1] = segmentIDs + + return &masterMock{ + collectionIDs: collectionIDs, + col2partition: col2partition, + partition2segment: partition2segment, + } +} + +func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { + collectionID := in.CollectionID + partitionIDs := make([]UniqueID, 0) + for _, id := range master.collectionIDs { + if id == collectionID { + partitions := master.col2partition[collectionID] + partitionIDs = append(partitionIDs, partitions...) + } + } + response := &milvuspb.ShowPartitionResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + PartitionIDs: partitionIDs, + } + + return response, nil +} + +func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { + collectionID := in.CollectionID + partitionID := in.PartitionID + + for _, id := range master.collectionIDs { + if id == collectionID { + partitions := master.col2partition[collectionID] + for _, partition := range partitions { + if partition == partitionID { + return &milvuspb.ShowSegmentResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + SegmentIDs: master.partition2segment[partition], + }, nil + } + } + } + } + + return nil, errors.New("collection id or partition id not found") +} + +type dataMock struct { + segmentIDs []UniqueID + segmentStates map[UniqueID]*datapb.SegmentStatesResponse +} + +func newDataMock() *dataMock { + positions1 := make([]*internalpb2.MsgPosition, 0) + positions2 := make([]*internalpb2.MsgPosition, 0) + positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)}) + positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)}) + positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)}) + positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)}) + + segmentIDs := make([]UniqueID, 0) + segmentIDs = append(segmentIDs, 1) + segmentIDs = append(segmentIDs, 2) + segmentIDs = append(segmentIDs, 3) + segmentIDs = append(segmentIDs, 4) + segmentIDs = append(segmentIDs, 5) + segmentIDs = append(segmentIDs, 6) + + fillStates := func(time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStatesResponse { + return &datapb.SegmentStatesResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_SUCCESS, + }, + State: state, + CreateTime: time, + StartPositions: position, + } + } + segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) + segmentStates[1] = fillStates(1, positions1, datapb.SegmentState_SegmentFlushed) + segmentStates[2] = fillStates(2, positions2, datapb.SegmentState_SegmentFlushed) + segmentStates[3] = fillStates(3, positions1, datapb.SegmentState_SegmentFlushed) + segmentStates[4] = fillStates(4, positions2, datapb.SegmentState_SegmentFlushed) + segmentStates[5] = fillStates(5, positions1, datapb.SegmentState_SegmentGrowing) + segmentStates[6] = fillStates(6, positions2, datapb.SegmentState_SegmentGrowing) + + return &dataMock{ + segmentIDs: segmentIDs, + segmentStates: segmentStates, + } +} + +func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { + segmentID := req.SegmentID + for _, id := range data.segmentIDs { + if segmentID == id { + return data.segmentStates[id], nil + } + } + return nil, errors.New("segment id not found") +} + func TestQueryService_Init(t *testing.T) { service, err := NewQueryService(context.Background()) assert.Nil(t, err) @@ -34,3 +171,35 @@ func TestQueryService_Init(t *testing.T) { service.Stop() } + +func TestQueryService_load(t *testing.T) { + service, err := NewQueryService(context.Background()) + assert.Nil(t, err) + service.Init() + service.Start() + service.SetMasterService(newMasterMock()) + service.SetDataService(newDataMock()) + registerNodeRequest := &querypb.RegisterNodeRequest{ + Address: &commonpb.Address{}, + } + service.RegisterNode(registerNodeRequest) + + t.Run("Test LoadSegment", func(t *testing.T) { + loadCollectionRequest := &querypb.LoadCollectionRequest{ + CollectionID: 1, + } + response, err := service.LoadCollection(loadCollectionRequest) + assert.Nil(t, err) + assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) + + t.Run("Test LoadPartition", func(t *testing.T) { + loadPartitionRequest := &querypb.LoadPartitionRequest{ + CollectionID: 1, + PartitionIDs: []UniqueID{1}, + } + response, err := service.LoadPartitions(loadPartitionRequest) + assert.Nil(t, err) + assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) + }) +} -- GitLab