未验证 提交 028c5cb8 编写于 作者: L Letian Jiang 提交者: GitHub

Modify grpc interface for replica Search/Query in QueryNode (#16326)

Signed-off-by: NLetian Jiang <letian.jiang@zilliz.com>
上级 7f7379d5
......@@ -20,6 +20,8 @@ import (
"context"
"fmt"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
......@@ -28,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
"google.golang.org/grpc"
)
var ClientParams paramtable.GrpcClientConfig
......@@ -242,7 +243,7 @@ func (c *Client) ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmen
}
// Search performs replica search tasks in QueryNode.
func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*milvuspb.SearchResults, error) {
func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
......@@ -252,11 +253,11 @@ func (c *Client) Search(ctx context.Context, req *querypb.SearchRequest) (*milvu
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.SearchResults), err
return ret.(*internalpb.SearchResults), err
}
// Query performs replica query tasks in QueryNode.
func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*milvuspb.QueryResults, error) {
func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
ret, err := c.grpcClient.ReCall(ctx, func(client interface{}) (interface{}, error) {
if !funcutil.CheckCtxValid(ctx) {
return nil, ctx.Err()
......@@ -266,7 +267,7 @@ func (c *Client) Query(ctx context.Context, req *querypb.QueryRequest) (*milvusp
if err != nil || ret == nil {
return nil, err
}
return ret.(*milvuspb.QueryResults), err
return ret.(*internalpb.RetrieveResults), err
}
// GetSegmentInfo gets the information of the specified segments in QueryNode.
......
......@@ -26,6 +26,11 @@ import (
"time"
ot "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/mq/msgstream"
"github.com/milvus-io/milvus/internal/proto/commonpb"
......@@ -40,10 +45,6 @@ import (
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
var Params paramtable.GrpcServerConfig
......@@ -305,12 +306,12 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
}
// Search performs search of streaming/historical replica on QueryNode.
func (s *Server) Search(ctx context.Context, req *querypb.SearchRequest) (*milvuspb.SearchResults, error) {
func (s *Server) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
return s.querynode.Search(ctx, req)
}
// Query performs query of streaming/historical replica on QueryNode.
func (s *Server) Query(ctx context.Context, req *querypb.QueryRequest) (*milvuspb.QueryResults, error) {
func (s *Server) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
return s.querynode.Query(ctx, req)
}
......
......@@ -23,12 +23,13 @@ import (
"github.com/milvus-io/milvus/internal/types"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/stretchr/testify/assert"
clientv3 "go.etcd.io/etcd/client/v3"
)
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
......@@ -43,8 +44,8 @@ type MockQueryNode struct {
strResp *milvuspb.StringResponse
infoResp *querypb.GetSegmentInfoResponse
metricResp *milvuspb.GetMetricsResponse
searchResp *milvuspb.SearchResults
queryResp *milvuspb.QueryResults
searchResp *internalpb.SearchResults
queryResp *internalpb.RetrieveResults
}
func (m *MockQueryNode) Init() error {
......@@ -115,11 +116,11 @@ func (m *MockQueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetrics
return m.metricResp, m.err
}
func (m *MockQueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*milvuspb.SearchResults, error) {
func (m *MockQueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
return m.searchResp, m.err
}
func (m *MockQueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*milvuspb.QueryResults, error) {
func (m *MockQueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
return m.queryResp, m.err
}
......
......@@ -51,8 +51,8 @@ service QueryNode {
rpc ReleaseSegments(ReleaseSegmentsRequest) returns (common.Status) {}
rpc GetSegmentInfo(GetSegmentInfoRequest) returns (GetSegmentInfoResponse) {}
rpc Search(SearchRequest) returns (milvus.SearchResults) {}
rpc Query(QueryRequest) returns (milvus.QueryResults) {}
rpc Search(SearchRequest) returns (internal.SearchResults) {}
rpc Query(QueryRequest) returns (internal.RetrieveResults) {}
// https://wiki.lfaidata.foundation/display/MIL/MEP+8+--+Add+metrics+for+proxy
rpc GetMetrics(milvus.GetMetricsRequest) returns (milvus.GetMetricsResponse) {}
......@@ -269,13 +269,13 @@ message ReleaseSegmentsRequest {
}
message SearchRequest {
milvus.SearchRequest req = 1;
internal.SearchRequest req = 1;
string dml_channel = 2;
repeated int64 segmentIDs = 3;
}
message QueryRequest {
milvus.QueryRequest req = 1;
internal.RetrieveRequest req = 1;
string dml_channel = 2;
repeated int64 segmentIDs = 3;
}
......
......@@ -158,10 +158,10 @@ func (client *queryNodeClientMock) GetMetrics(ctx context.Context, req *milvuspb
return client.grpcClient.GetMetrics(ctx, req)
}
func (client *queryNodeClientMock) Search(ctx context.Context, req *querypb.SearchRequest) (*milvuspb.SearchResults, error) {
func (client *queryNodeClientMock) Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
return client.grpcClient.Search(ctx, req)
}
func (client *queryNodeClientMock) Query(ctx context.Context, req *querypb.QueryRequest) (*milvuspb.QueryResults, error) {
func (client *queryNodeClientMock) Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
return client.grpcClient.Query(ctx, req)
}
......@@ -556,12 +556,12 @@ func (node *QueryNode) isHealthy() bool {
}
// Search performs replica search tasks.
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*milvuspb.SearchResults, error) {
func (node *QueryNode) Search(ctx context.Context, req *queryPb.SearchRequest) (*internalpb.SearchResults, error) {
return nil, errors.New("not implemented")
}
// Query performs replica query tasks.
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*milvuspb.QueryResults, error) {
func (node *QueryNode) Query(ctx context.Context, req *queryPb.QueryRequest) (*internalpb.RetrieveResults, error) {
return nil, errors.New("not implemented")
}
......
......@@ -19,6 +19,8 @@ package types
import (
"context"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -29,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
clientv3 "go.etcd.io/etcd/client/v3"
)
// TimeTickProvider is the interface all services implement
......@@ -1132,8 +1133,8 @@ type QueryNode interface {
ReleaseSegments(ctx context.Context, req *querypb.ReleaseSegmentsRequest) (*commonpb.Status, error)
GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error)
Search(ctx context.Context, req *querypb.SearchRequest) (*milvuspb.SearchResults, error)
Query(ctx context.Context, req *querypb.QueryRequest) (*milvuspb.QueryResults, error)
Search(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error)
Query(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error)
// GetMetrics gets the metrics about QueryNode.
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
......
......@@ -78,13 +78,14 @@ func (m *QueryNodeClient) GetSegmentInfo(ctx context.Context, in *querypb.GetSeg
return &querypb.GetSegmentInfoResponse{}, m.Err
}
func (m *QueryNodeClient) Search(ctx context.Context, in *querypb.SearchRequest, opts ...grpc.CallOption) (*milvuspb.SearchResults, error) {
return &milvuspb.SearchResults{}, m.Err
func (m *QueryNodeClient) Search(ctx context.Context, in *querypb.SearchRequest, opts ...grpc.CallOption) (*internalpb.SearchResults, error) {
return &internalpb.SearchResults{}, m.Err
}
func (m *QueryNodeClient) Query(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) (*milvuspb.QueryResults, error) {
return &milvuspb.QueryResults{}, m.Err
func (m *QueryNodeClient) Query(ctx context.Context, in *querypb.QueryRequest, opts ...grpc.CallOption) (*internalpb.RetrieveResults, error) {
return &internalpb.RetrieveResults{}, m.Err
}
func (m *QueryNodeClient) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest, opts ...grpc.CallOption) (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{}, m.Err
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册