未验证 提交 58addbf9 编写于 作者: X XuanYang-cn 提交者: GitHub

Add GetShardLeaders rpc in querycoord proto (#16299)

See also: #16298
Signed-off-by: Nyangxuan <xuan.yang@zilliz.com>
上级 e20385c8
......@@ -360,6 +360,10 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return nil, nil
}
func (m *MockQueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return nil, nil
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockDataCoord struct {
MockBase
......
......@@ -300,3 +300,17 @@ func (c *Client) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}
return ret.(*milvuspb.GetMetricsResponse), err
}
// GetShardLeaders gets the shard leaders of a certain collection.
func (c *Client) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
}
return client.(querypb.QueryCoordClient).GetShardLeaders(ctx, req)
})
if err != nil || ret == nil {
return nil, err
}
return ret.(*querypb.GetShardLeadersResponse), err
}
......@@ -107,6 +107,9 @@ func Test_NewClient(t *testing.T) {
r16, err := client.LoadBalance(ctx, nil)
retCheck(retNotNil, r16, err)
r17, err := client.GetShardLeaders(ctx, nil)
retCheck(retNotNil, r17, err)
}
client.grpcClient = &mock.ClientBase{
......
......@@ -389,3 +389,8 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
return s.queryCoord.GetMetrics(ctx, req)
}
// GetShardLeaders returns the shard leaders of a certain collection.
func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return s.queryCoord.GetShardLeaders(ctx, req)
}
......@@ -34,20 +34,21 @@ import (
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockQueryCoord struct {
states *internalpb.ComponentStates
status *commonpb.Status
err error
initErr error
startErr error
stopErr error
regErr error
strResp *milvuspb.StringResponse
showcolResp *querypb.ShowCollectionsResponse
showpartResp *querypb.ShowPartitionsResponse
partResp *querypb.GetPartitionStatesResponse
channelResp *querypb.CreateQueryChannelResponse
infoResp *querypb.GetSegmentInfoResponse
metricResp *milvuspb.GetMetricsResponse
states *internalpb.ComponentStates
status *commonpb.Status
err error
initErr error
startErr error
stopErr error
regErr error
strResp *milvuspb.StringResponse
showcolResp *querypb.ShowCollectionsResponse
showpartResp *querypb.ShowPartitionsResponse
partResp *querypb.GetPartitionStatesResponse
channelResp *querypb.CreateQueryChannelResponse
infoResp *querypb.GetSegmentInfoResponse
metricResp *milvuspb.GetMetricsResponse
shardLeadersResp *querypb.GetShardLeadersResponse
}
func (m *MockQueryCoord) Init() error {
......@@ -142,6 +143,10 @@ func (m *MockQueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetric
return m.metricResp, m.err
}
func (m *MockQueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
return m.shardLeadersResp, m.err
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
type MockRootCoord struct {
types.RootCoord
......
......@@ -30,6 +30,9 @@ service QueryCoord {
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+23+--+Multiple+memory+replication+design
rpc GetShardLeaders(GetShardLeadersRequest) returns (GetShardLeadersResponse) {}
}
service QueryNode {
......@@ -147,6 +150,22 @@ message GetSegmentInfoResponse {
repeated SegmentInfo infos = 2;
}
message GetShardLeadersRequest {
common.MsgBase base = 1;
int64 collectionID = 2;
}
message GetShardLeadersResponse {
common.Status status= 1 ;
repeated ShardLeaderList list = 2;
}
message ShardLeaderList {
string channel_name = 1;
int64 nodeID = 2;
string address = 3; // 127.0.0.1:9000
}
//-----------------query node grpc request and response proto----------------
message AddQueryChannelRequest {
common.MsgBase base = 1;
......@@ -169,7 +188,7 @@ message RemoveQueryChannelRequest {
message LoadMetaInfo {
LoadType load_type = 1;
int64 collectionID = 2;
repeated int64 partitionIDs = 3;
repeated int64 partitionIDs = 3;
}
message WatchDmChannelsRequest {
......
......@@ -332,6 +332,24 @@ func (coord *QueryCoordMock) GetMetrics(ctx context.Context, req *milvuspb.GetMe
}, nil
}
func (coord *QueryCoordMock) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
if !coord.healthy() {
return &querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "unhealthy",
},
}, nil
}
return &querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "not implemented",
},
}, nil
}
func NewQueryCoordMock(opts ...QueryCoordMockOption) *QueryCoordMock {
coord := &QueryCoordMock{
nodeID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
......
......@@ -1015,3 +1015,14 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
return getMetricsResponse, nil
}
// GetShardLeaders gets shard leaders of a certain collection
func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
// TODO: to impl
return &querypb.GetShardLeadersResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "Not implemented",
},
}, nil
}
......@@ -1168,6 +1168,8 @@ type QueryCoord interface {
LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error)
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error)
}
// QueryCoordComponent is used by grpc server of QueryCoord
......
......@@ -27,6 +27,9 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
)
// Check if QueryCoordClient implements proto.QueryCoordClient
var _ querypb.QueryCoordClient = &QueryCoordClient{}
type QueryCoordClient struct {
Err error
}
......@@ -86,3 +89,7 @@ func (m *QueryCoordClient) LoadBalance(ctx context.Context, in *querypb.LoadBala
func (m *QueryCoordClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{}, m.Err
}
func (m *QueryCoordClient) GetShardLeaders(ctx context.Context, in *querypb.GetShardLeadersRequest, opts ...grpc.CallOption) (*querypb.GetShardLeadersResponse, error) {
return &querypb.GetShardLeadersResponse{}, m.Err
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册