未验证 提交 0300b682 编写于 作者: S sunby 提交者: GitHub

Remove deprecated interfaces in data coordinator (#5929)

* Remove derecated interfaces in data coordinator
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>

* Remove RegisterNode in data coordinator
Signed-off-by: Nsunby <bingyi.sun@zilliz.com>
上级 92e429d8
......@@ -17,13 +17,10 @@ type DataCoord interface {
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error)
GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error)
GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error)
GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error)
GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error)
GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error)
......@@ -45,20 +42,6 @@ type MsgBase struct {
}
```
* *RegisterNode*
```go
type RegisterNodeRequest struct {
Base *commonpb.MsgBase
Address *commonpb.Address
}
type RegisterNodeResponse struct {
InitParams *internalpb.InitParams
Status *commonpb.Status
}
```
* *Flush*
```go
......@@ -101,23 +84,6 @@ type AssignSegmentIDResponse struct {
}
```
* *ShowSegments*
```go
type ShowSegmentsRequest struct {
Base *commonpb.MsgBase
CollectionID UniqueID
PartitionID UniqueID
DbID UniqueID
}
type ShowSegmentsResponse struct {
SegmentIDs []UniqueID
Status *commonpb.Status
}
```
* *GetSegmentStates*
......@@ -167,16 +133,6 @@ type GetInsertBinlogPathsResponse struct {
}
```
* *GetInsertChannels*
```go
type GetInsertChannelsRequest struct {
Base *commonpb.MsgBase
DbID UniqueID
CollectionID UniqueID
}
```
* *GetCollectionStatistics*
```go
......
......@@ -171,23 +171,6 @@ func (ds *DataServiceFactory) SaveBinlogPaths(ctx context.Context, req *datapb.S
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil
}
func (ds *DataServiceFactory) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
ret := &datapb.RegisterNodeResponse{Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success}}
ret.InitParams = &internalpb.InitParams{
NodeID: Params.NodeID,
StartParams: []*commonpb.KeyValuePair{
{Key: "DDChannelName", Value: "fake-dd-channel-name"},
{Key: "SegmentStatisticsChannelName", Value: "fake-segment-statistics-channel-name"},
{Key: "TimeTickChannelName", Value: "fake-time-tick-channel-name"},
{Key: "CompleteFlushChannelName", Value: "fake-complete-flush-name"},
},
}
return ret, nil
}
func (mf *MetaFactory) CollectionMetaFactory(collectionID UniqueID, collectionName string) *etcdpb.CollectionMeta {
sch := schemapb.CollectionSchema{
Name: collectionName,
......
......@@ -135,22 +135,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
}, nil
}
func (s *Server) ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error) {
resp := &datapb.ShowSegmentsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}
if s.isClosed() {
resp.Status.Reason = "server is initializing"
return resp, nil
}
ids := s.meta.GetSegmentsOfPartition(req.CollectionID, req.PartitionID)
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.SegmentIDs = ids
return resp, nil
}
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
resp := &datapb.GetSegmentStatesResponse{
Status: &commonpb.Status{
......@@ -221,15 +205,6 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
return resp, nil
}
func (s *Server) GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error) {
return &internalpb.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Values: []string{},
}, nil
}
func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
resp := &datapb.GetCollectionStatisticsResponse{
Status: &commonpb.Status{
......@@ -474,12 +449,3 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil
}
func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return &datapb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
......@@ -105,57 +105,6 @@ func TestAssignSegmentID(t *testing.T) {
}
}
func TestShowSegments(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
segments := []struct {
id UniqueID
collectionID UniqueID
partitionID UniqueID
}{
{0, 0, 0},
{1, 0, 0},
{2, 0, 1},
{3, 1, 1},
}
for _, segment := range segments {
err := svr.meta.AddSegment(&datapb.SegmentInfo{
ID: segment.id,
CollectionID: segment.collectionID,
PartitionID: segment.partitionID,
})
assert.Nil(t, err)
}
cases := []struct {
description string
collectionID UniqueID
partitionID UniqueID
expected []UniqueID
}{
{"show segments normally", 0, 0, []UniqueID{0, 1}},
{"show non-existed segments", 1, 2, []UniqueID{}},
}
for _, test := range cases {
t.Run(test.description, func(t *testing.T) {
resp, err := svr.ShowSegments(context.TODO(), &datapb.ShowSegmentsRequest{
Base: &commonpb.MsgBase{
MsgType: 0,
MsgID: 0,
Timestamp: 0,
SourceID: 0,
},
CollectionID: test.collectionID,
PartitionID: test.partitionID,
DbID: 0,
})
assert.Nil(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.ElementsMatch(t, test.expected, resp.SegmentIDs)
})
}
}
func TestFlush(t *testing.T) {
svr := newTestServer(t, nil)
defer closeTestServer(t, svr)
......
......@@ -103,17 +103,6 @@ func (m *mockDataService) GetComponentStates(ctx context.Context) (*internalpb.C
}, nil
}
func (m *mockDataService) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return &datapb.RegisterNodeResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
InitParams: &internalpb.InitParams{
NodeID: int64(1),
},
}, nil
}
func TestRun(t *testing.T) {
ctx := context.Background()
msFactory := msgstream.NewPmsFactory()
......
......@@ -170,13 +170,6 @@ func (c *Client) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
return ret.(*milvuspb.StringResponse), err
}
func (c *Client) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.RegisterNode(ctx, req)
})
return ret.(*datapb.RegisterNodeResponse), err
}
func (c *Client) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.Flush(ctx, req)
......@@ -191,13 +184,6 @@ func (c *Client) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
return ret.(*datapb.AssignSegmentIDResponse), err
}
func (c *Client) ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.ShowSegments(ctx, req)
})
return ret.(*datapb.ShowSegmentsResponse), err
}
func (c *Client) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetSegmentStates(ctx, req)
......@@ -212,13 +198,6 @@ func (c *Client) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
return ret.(*datapb.GetInsertBinlogPathsResponse), err
}
func (c *Client) GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetInsertChannels(ctx, req)
})
return ret.(*internalpb.StringList), err
}
func (c *Client) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
ret, err := c.recall(func() (interface{}, error) {
return c.grpcClient.GetCollectionStatistics(ctx, req)
......
......@@ -122,16 +122,6 @@ func TestRun(t *testing.T) {
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
})
//t.Run("register node", func(t *testing.T) {
// req := &datapb.RegisterNodeRequest{
// Base: &commonpb.MsgBase{},
// Address: &commonpb.Address{},
// }
// rsp, err := dsServer.RegisterNode(ctx, req)
// assert.Nil(t, err)
// assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
//})
t.Run("flush", func(t *testing.T) {
req := &datapb.FlushRequest{}
rsp, err := dsServer.Flush(ctx, req)
......@@ -146,13 +136,6 @@ func TestRun(t *testing.T) {
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("show segments", func(t *testing.T) {
req := &datapb.ShowSegmentsRequest{}
rsp, err := dsServer.ShowSegments(ctx, req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("get segment states", func(t *testing.T) {
req := &datapb.GetSegmentStatesRequest{}
rsp, err := dsServer.GetSegmentStates(ctx, req)
......@@ -167,13 +150,6 @@ func TestRun(t *testing.T) {
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("get insert channels", func(t *testing.T) {
req := &datapb.GetInsertChannelsRequest{}
rsp, err := dsServer.GetInsertChannels(ctx, req)
assert.Nil(t, err)
assert.Equal(t, rsp.Status.ErrorCode, commonpb.ErrorCode_Success)
})
t.Run("get collection statistics", func(t *testing.T) {
req := &datapb.GetCollectionStatisticsRequest{}
rsp, err := dsServer.GetCollectionStatistics(ctx, req)
......
......@@ -197,10 +197,6 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
return s.dataService.GetSegmentInfo(ctx, req)
}
func (s *Server) RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error) {
return s.dataService.RegisterNode(ctx, req)
}
func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error) {
return s.dataService.Flush(ctx, req)
}
......@@ -209,10 +205,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI
return s.dataService.AssignSegmentID(ctx, req)
}
func (s *Server) ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error) {
return s.dataService.ShowSegments(ctx, req)
}
func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) {
return s.dataService.GetSegmentStates(ctx, req)
}
......@@ -221,10 +213,6 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert
return s.dataService.GetInsertBinlogPaths(ctx, req)
}
func (s *Server) GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error) {
return s.dataService.GetInsertChannels(ctx, req)
}
func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error) {
return s.dataService.GetCollectionStatistics(ctx, req)
}
......
......@@ -14,17 +14,14 @@ service DataService {
rpc GetTimeTickChannel(internal.GetTimeTickChannelRequest) returns(milvus.StringResponse) {}
rpc GetStatisticsChannel(internal.GetStatisticsChannelRequest) returns(milvus.StringResponse){}
rpc RegisterNode(RegisterNodeRequest) returns (RegisterNodeResponse) {}
rpc Flush(FlushRequest) returns (common.Status) {}
rpc AssignSegmentID(AssignSegmentIDRequest) returns (AssignSegmentIDResponse) {}
rpc ShowSegments(ShowSegmentsRequest) returns (ShowSegmentsResponse) {}
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
rpc GetSegmentStates(GetSegmentStatesRequest) returns (GetSegmentStatesResponse) {}
rpc GetInsertBinlogPaths(GetInsertBinlogPathsRequest) returns (GetInsertBinlogPathsResponse) {}
rpc GetInsertChannels(GetInsertChannelsRequest) returns (internal.StringList) {}
rpc GetCollectionStatistics(GetCollectionStatisticsRequest) returns (GetCollectionStatisticsResponse) {}
rpc GetPartitionStatistics(GetPartitionStatisticsRequest) returns (GetPartitionStatisticsResponse) {}
......@@ -42,16 +39,6 @@ service DataNode {
rpc FlushSegments(FlushSegmentsRequest) returns(common.Status) {}
}
message RegisterNodeRequest {
common.MsgBase base = 1;
common.Address address = 2;
}
message RegisterNodeResponse {
internal.InitParams init_params = 1;
common.Status status = 2;
}
message FlushRequest {
common.MsgBase base = 1;
int64 dbID = 2;
......@@ -86,18 +73,6 @@ message AssignSegmentIDResponse {
common.Status status = 2;
}
message ShowSegmentsRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
int64 partitionID = 3;
int64 dbID = 4;
}
message ShowSegmentsResponse {
repeated int64 segmentIDs = 1;
common.Status status = 2;
}
message GetSegmentStatesRequest {
common.MsgBase base = 1;
repeated int64 segmentIDs = 2;
......@@ -137,12 +112,6 @@ message GetInsertBinlogPathsResponse {
common.Status status = 3;
}
message GetInsertChannelsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
int64 collectionID = 3;
}
message GetCollectionStatisticsRequest {
common.MsgBase base = 1;
int64 dbID = 2;
......
......@@ -157,11 +157,3 @@ func (data *DataMock) GetSegmentStates(ctx context.Context, req *datapb.GetSegme
return ret, nil
}
func (data *DataMock) GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error) {
return &internalpb.StringList{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Values: []string{"insert-0", "insert-1", "insert-2", "insert-3"},
}, nil
}
......@@ -51,13 +51,10 @@ type DataService interface {
Flush(ctx context.Context, req *datapb.FlushRequest) (*commonpb.Status, error)
RegisterNode(ctx context.Context, req *datapb.RegisterNodeRequest) (*datapb.RegisterNodeResponse, error)
AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error)
ShowSegments(ctx context.Context, req *datapb.ShowSegmentsRequest) (*datapb.ShowSegmentsResponse, error)
GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error)
GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error)
GetSegmentInfoChannel(ctx context.Context) (*milvuspb.StringResponse, error)
GetInsertChannels(ctx context.Context, req *datapb.GetInsertChannelsRequest) (*internalpb.StringList, error)
GetCollectionStatistics(ctx context.Context, req *datapb.GetCollectionStatisticsRequest) (*datapb.GetCollectionStatisticsResponse, error)
GetPartitionStatistics(ctx context.Context, req *datapb.GetPartitionStatisticsRequest) (*datapb.GetPartitionStatisticsResponse, error)
GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册