diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 500e0456f541eea4ebe6891c4347fb4403f851c8..2aece3101fc10dd682dfde79ad90e51cb220bd05 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -16,6 +16,7 @@ import ( "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" "github.com/zilliztech/milvus-distributed/internal/proto/masterpb" "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" + "github.com/zilliztech/milvus-distributed/internal/util/typeutil" ) const ( @@ -38,7 +39,6 @@ type ( FlushSegments(in *datapb.FlushSegRequest) (*commonpb.Status, error) SetMasterServiceInterface(ms MasterServiceInterface) error - SetDataServiceInterface(ds DataServiceInterface) error } @@ -55,7 +55,6 @@ type ( } DataNode struct { - // GOOSE TODO: complete interface with component ctx context.Context NodeID UniqueID Role string @@ -80,8 +79,8 @@ func NewDataNode(ctx context.Context) *DataNode { Params.Init() node := &DataNode{ ctx: ctx, - NodeID: Params.NodeID, // GOOSE TODO - Role: "DataNode", // GOOSE TODO + NodeID: Params.NodeID, // GOOSE TODO + Role: typeutil.DataNodeRole, // GOOSE TODO State: internalpb2.StateCode_INITIALIZING, dataSyncService: nil, metaService: nil, @@ -118,11 +117,10 @@ func (node *DataNode) Init() error { resp, err := node.dataService.RegisterNode(req) if err != nil { - return errors.Errorf("Init failed: %v", err) + return errors.Errorf("Register node failed: %v", err) } for _, kv := range resp.InitParams.StartParams { - log.Println(kv) switch kv.Key { case "DDChannelName": Params.DDChannelNames = []string{kv.Value} @@ -150,7 +148,7 @@ func (node *DataNode) Init() error { node.metaService = newMetaService(node.ctx, replica, node.masterService) node.replica = replica - // Opentracing + // --- Opentracing --- cfg := &config.Configuration{ ServiceName: "data_node", Sampler: &config.SamplerConfig{ @@ -167,7 +165,6 @@ func (node *DataNode) Init() error { } node.tracer = tracer node.closer = closer - opentracing.SetGlobalTracer(node.tracer) return nil diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 182b7ec7e7d879639946f728249f76384f769ac9..8a9561fae1469ec1de22196e40292c582804086d 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -27,6 +27,7 @@ func newMetaService(ctx context.Context, replica collectionReplica, m MasterServ } func (mService *metaService) init() { + log.Println("Initing meta ...") err := mService.loadCollections() if err != nil { log.Fatal("metaService init failed:", err) diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 658cd2bf1df22ab8f2c2cdfba08401c2c69c7853..b0903bd48567f214040d89a11b45e0c7f8081a59 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -7,7 +7,6 @@ import ( "sync" dn "github.com/zilliztech/milvus-distributed/internal/datanode" - "github.com/zilliztech/milvus-distributed/internal/errors" "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" "github.com/zilliztech/milvus-distributed/internal/proto/datapb" "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" @@ -67,11 +66,6 @@ func (s *Server) SetDataServiceInterface(ds dn.DataServiceInterface) error { } func (s *Server) Init() error { - err := s.core.Init() - if err != nil { - return errors.Errorf("Init failed: %v", err) - } - return s.core.Init() } diff --git a/internal/queryservice/queryservice.go b/internal/queryservice/queryservice.go index 942407fcbdc0f0f88e379169d5c116ffa940bbed..25cb8a3864748e38fafda31e0ffb7c664e06c5bd 100644 --- a/internal/queryservice/queryservice.go +++ b/internal/queryservice/queryservice.go @@ -77,6 +77,26 @@ func (qs *QueryService) Stop() error { return nil } +//func (qs *QueryService) SetDataService(d querynode.DataServiceInterface) error { +// for _, v := range qs.queryNodeClient { +// err := v.SetDataService(d) +// if err != nil { +// return err +// } +// } +// return nil +//} +// +//func (qs *QueryService) SetIndexService(i querynode.IndexServiceInterface) error { +// for _, v := range qs.queryNodeClient { +// err := v.SetIndexService(i) +// if err != nil { +// return err +// } +// } +// return nil +//} + func (qs *QueryService) GetComponentStates() (*internalpb2.ComponentStates, error) { serviceComponentInfo := &internalpb2.ComponentInfo{ NodeID: Params.QueryServiceID, @@ -114,7 +134,6 @@ func (qs *QueryService) GetStatisticsChannel() (string, error) { // TODO:: do addWatchDmChannel to query node after registerNode func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb.RegisterNodeResponse, error) { fmt.Println("register query node =", req.Address) - // TODO:: add mutex allocatedID := qs.numRegisterNode qs.numRegisterNode++ @@ -139,7 +158,6 @@ func (qs *QueryService) RegisterNode(req *querypb.RegisterNodeRequest) (*querypb } qs.queryNodes = append(qs.queryNodes, node) - // TODO:: watch dm channels return &querypb.RegisterNodeResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_SUCCESS, @@ -271,7 +289,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm segmentIDs := showSegmentResponse.SegmentIDs segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) channel2id := make(map[string]int) - //id2channels := make(map[int][]string) + id2channels := make(map[int][]string) id2segs := make(map[int][]UniqueID) offset := 0 @@ -288,16 +306,13 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm for i, str := range state.StartPositions { flatChannelName += str.ChannelName channelNames = append(channelNames, str.ChannelName) - if i+1 < len(state.StartPositions) { + if i < len(state.StartPositions) { flatChannelName += "/" } } - if flatChannelName == "" { - log.Fatal("segmentState's channel name is empty") - } if _, ok := channel2id[flatChannelName]; !ok { channel2id[flatChannelName] = offset - //id2channels[offset] = channelNames + id2channels[offset] = channelNames id2segs[offset] = make([]UniqueID, 0) id2segs[offset] = append(id2segs[offset], segmentID) offset++ @@ -314,7 +329,7 @@ func (qs *QueryService) LoadPartitions(req *querypb.LoadPartitionRequest) (*comm if segmentStates[v].State == datapb.SegmentState_SegmentFlushed { selectedSegs = append(selectedSegs, v) } else { - if i > 0 && segmentStates[selectedSegs[i-1]].State != datapb.SegmentState_SegmentFlushed { + if i > 0 && segmentStates[v-1].State != datapb.SegmentState_SegmentFlushed { break } selectedSegs = append(selectedSegs, v) diff --git a/internal/queryservice/queryservice_test.go b/internal/queryservice/queryservice_test.go index d6319cf5c51dfac75e057f5197cfc28245d6bcf2..49038ff7ab5fbaeabb4903dd582631475da28b88 100644 --- a/internal/queryservice/queryservice_test.go +++ b/internal/queryservice/queryservice_test.go @@ -2,148 +2,11 @@ package queryservice import ( "context" - "strconv" "testing" "github.com/stretchr/testify/assert" - - "github.com/zilliztech/milvus-distributed/internal/errors" - "github.com/zilliztech/milvus-distributed/internal/proto/commonpb" - "github.com/zilliztech/milvus-distributed/internal/proto/datapb" - "github.com/zilliztech/milvus-distributed/internal/proto/internalpb2" - "github.com/zilliztech/milvus-distributed/internal/proto/milvuspb" - "github.com/zilliztech/milvus-distributed/internal/proto/querypb" ) -type masterMock struct { - collectionIDs []UniqueID - col2partition map[UniqueID][]UniqueID - partition2segment map[UniqueID][]UniqueID -} - -func newMasterMock() *masterMock { - collectionIDs := make([]UniqueID, 0) - collectionIDs = append(collectionIDs, 1) - - col2partition := make(map[UniqueID][]UniqueID) - partitionIDs := make([]UniqueID, 0) - partitionIDs = append(partitionIDs, 1) - col2partition[1] = partitionIDs - - partition2segment := make(map[UniqueID][]UniqueID) - segmentIDs := make([]UniqueID, 0) - segmentIDs = append(segmentIDs, 1) - segmentIDs = append(segmentIDs, 2) - segmentIDs = append(segmentIDs, 3) - segmentIDs = append(segmentIDs, 4) - segmentIDs = append(segmentIDs, 5) - segmentIDs = append(segmentIDs, 6) - partition2segment[1] = segmentIDs - - return &masterMock{ - collectionIDs: collectionIDs, - col2partition: col2partition, - partition2segment: partition2segment, - } -} - -func (master *masterMock) ShowPartitions(in *milvuspb.ShowPartitionRequest) (*milvuspb.ShowPartitionResponse, error) { - collectionID := in.CollectionID - partitionIDs := make([]UniqueID, 0) - for _, id := range master.collectionIDs { - if id == collectionID { - partitions := master.col2partition[collectionID] - partitionIDs = append(partitionIDs, partitions...) - } - } - response := &milvuspb.ShowPartitionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - PartitionIDs: partitionIDs, - } - - return response, nil -} - -func (master *masterMock) ShowSegments(in *milvuspb.ShowSegmentRequest) (*milvuspb.ShowSegmentResponse, error) { - collectionID := in.CollectionID - partitionID := in.PartitionID - - for _, id := range master.collectionIDs { - if id == collectionID { - partitions := master.col2partition[collectionID] - for _, partition := range partitions { - if partition == partitionID { - return &milvuspb.ShowSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - SegmentIDs: master.partition2segment[partition], - }, nil - } - } - } - } - - return nil, errors.New("collection id or partition id not found") -} - -type dataMock struct { - segmentIDs []UniqueID - segmentStates map[UniqueID]*datapb.SegmentStatesResponse -} - -func newDataMock() *dataMock { - positions1 := make([]*internalpb2.MsgPosition, 0) - positions2 := make([]*internalpb2.MsgPosition, 0) - positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(1, 10)}) - positions1 = append(positions1, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(2, 10)}) - positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(3, 10)}) - positions2 = append(positions2, &internalpb2.MsgPosition{ChannelName: "insertChannel-" + strconv.FormatInt(4, 10)}) - - segmentIDs := make([]UniqueID, 0) - segmentIDs = append(segmentIDs, 1) - segmentIDs = append(segmentIDs, 2) - segmentIDs = append(segmentIDs, 3) - segmentIDs = append(segmentIDs, 4) - segmentIDs = append(segmentIDs, 5) - segmentIDs = append(segmentIDs, 6) - - fillStates := func(time uint64, position []*internalpb2.MsgPosition, state datapb.SegmentState) *datapb.SegmentStatesResponse { - return &datapb.SegmentStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_SUCCESS, - }, - State: state, - CreateTime: time, - StartPositions: position, - } - } - segmentStates := make(map[UniqueID]*datapb.SegmentStatesResponse) - segmentStates[1] = fillStates(1, positions1, datapb.SegmentState_SegmentFlushed) - segmentStates[2] = fillStates(2, positions2, datapb.SegmentState_SegmentFlushed) - segmentStates[3] = fillStates(3, positions1, datapb.SegmentState_SegmentFlushed) - segmentStates[4] = fillStates(4, positions2, datapb.SegmentState_SegmentFlushed) - segmentStates[5] = fillStates(5, positions1, datapb.SegmentState_SegmentGrowing) - segmentStates[6] = fillStates(6, positions2, datapb.SegmentState_SegmentGrowing) - - return &dataMock{ - segmentIDs: segmentIDs, - segmentStates: segmentStates, - } -} - -func (data *dataMock) GetSegmentStates(req *datapb.SegmentStatesRequest) (*datapb.SegmentStatesResponse, error) { - segmentID := req.SegmentID - for _, id := range data.segmentIDs { - if segmentID == id { - return data.segmentStates[id], nil - } - } - return nil, errors.New("segment id not found") -} - func TestQueryService_Init(t *testing.T) { service, err := NewQueryService(context.Background()) assert.Nil(t, err) @@ -171,35 +34,3 @@ func TestQueryService_Init(t *testing.T) { service.Stop() } - -func TestQueryService_load(t *testing.T) { - service, err := NewQueryService(context.Background()) - assert.Nil(t, err) - service.Init() - service.Start() - service.SetMasterService(newMasterMock()) - service.SetDataService(newDataMock()) - registerNodeRequest := &querypb.RegisterNodeRequest{ - Address: &commonpb.Address{}, - } - service.RegisterNode(registerNodeRequest) - - t.Run("Test LoadSegment", func(t *testing.T) { - loadCollectionRequest := &querypb.LoadCollectionRequest{ - CollectionID: 1, - } - response, err := service.LoadCollection(loadCollectionRequest) - assert.Nil(t, err) - assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) - }) - - t.Run("Test LoadPartition", func(t *testing.T) { - loadPartitionRequest := &querypb.LoadPartitionRequest{ - CollectionID: 1, - PartitionIDs: []UniqueID{1}, - } - response, err := service.LoadPartitions(loadPartitionRequest) - assert.Nil(t, err) - assert.Equal(t, response.ErrorCode, commonpb.ErrorCode_SUCCESS) - }) -}