未验证 提交 a3630e17 编写于 作者: C congqixia 提交者: GitHub

Refine grpc Status handling: retry legacy only for Unimplemented (#25041)

Signed-off-by: NCongqi Xia <congqi.xia@zilliz.com>
上级 72c5e2a4
...@@ -28,6 +28,7 @@ import ( ...@@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
"github.com/samber/lo" "github.com/samber/lo"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/codes"
) )
type TargetScope = int32 type TargetScope = int32
...@@ -208,7 +209,7 @@ func (mgr *TargetManager) PullNextTarget(broker Broker, collectionID int64, chos ...@@ -208,7 +209,7 @@ func (mgr *TargetManager) PullNextTarget(broker Broker, collectionID int64, chos
vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID) vChannelInfos, segmentInfos, err := broker.GetRecoveryInfoV2(context.TODO(), collectionID)
if err != nil { if err != nil {
// if meet rpc error, for compatibility with previous versions, try pull next target v1 // if meet rpc error, for compatibility with previous versions, try pull next target v1
if funcutil.IsGrpcErr(err) { if funcutil.IsGrpcErr(err, codes.Unimplemented) {
target, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...) target, err = mgr.PullNextTargetV1(broker, collectionID, chosenPartitionIDs...)
return err return err
} }
......
...@@ -219,7 +219,7 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() { ...@@ -219,7 +219,7 @@ func (suite *TargetManagerSuite) TestUpdateNextTarget() {
suite.broker.ExpectedCalls = nil suite.broker.ExpectedCalls = nil
// test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed // test getRecoveryInfoV2 failed , then back to getRecoveryInfo succeed
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, status.Errorf(codes.NotFound, "fake not found")) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, collectionID).Return(nil, nil, status.Errorf(codes.Unimplemented, "fake not found"))
suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil) suite.broker.EXPECT().GetPartitions(mock.Anything, mock.Anything).Return([]int64{1}, nil)
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil) suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collectionID, int64(1)).Return(nextTargetChannels, nextTargetBinlogs, nil)
err := suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1)) err := suite.mgr.UpdateCollectionNextTargetWithPartitions(collectionID, int64(1))
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"go.uber.org/zap" "go.uber.org/zap"
"google.golang.org/grpc/codes"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/internalpb"
...@@ -120,7 +121,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e ...@@ -120,7 +121,7 @@ func (w *remoteWorker) Delete(ctx context.Context, req *querypb.DeleteRequest) e
func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) { func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRequest) (*internalpb.SearchResults, error) {
ret, err := w.client.SearchSegments(ctx, req) ret, err := w.client.SearchSegments(ctx, req)
if err != nil && funcutil.IsGrpcErr(err) { if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) {
// for compatible with rolling upgrade from version before v2.2.9 // for compatible with rolling upgrade from version before v2.2.9
return w.client.Search(ctx, req) return w.client.Search(ctx, req)
} }
...@@ -130,7 +131,7 @@ func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRe ...@@ -130,7 +131,7 @@ func (w *remoteWorker) SearchSegments(ctx context.Context, req *querypb.SearchRe
func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { func (w *remoteWorker) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
ret, err := w.client.QuerySegments(ctx, req) ret, err := w.client.QuerySegments(ctx, req)
if err != nil && funcutil.IsGrpcErr(err) { if err != nil && funcutil.IsGrpcErr(err, codes.Unimplemented) {
// for compatible with rolling upgrade from version before v2.2.9 // for compatible with rolling upgrade from version before v2.2.9
return w.client.Query(ctx, req) return w.client.Query(ctx, req)
} }
......
...@@ -238,7 +238,7 @@ func (s *RemoteWorkerSuite) TestSearch() { ...@@ -238,7 +238,7 @@ func (s *RemoteWorkerSuite) TestSearch() {
var result *internalpb.SearchResults var result *internalpb.SearchResults
var err error var err error
grpcErr := status.Error(codes.NotFound, "method not implemented") grpcErr := status.Error(codes.Unimplemented, "method not implemented")
s.mockClient.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). s.mockClient.EXPECT().SearchSegments(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")).
Return(result, grpcErr) Return(result, grpcErr)
s.mockClient.EXPECT().Search(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")). s.mockClient.EXPECT().Search(mock.Anything, mock.AnythingOfType("*querypb.SearchRequest")).
...@@ -317,7 +317,7 @@ func (s *RemoteWorkerSuite) TestQuery() { ...@@ -317,7 +317,7 @@ func (s *RemoteWorkerSuite) TestQuery() {
var result *internalpb.RetrieveResults var result *internalpb.RetrieveResults
var err error var err error
grpcErr := status.Error(codes.NotFound, "method not implemented") grpcErr := status.Error(codes.Unimplemented, "method not implemented")
s.mockClient.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). s.mockClient.EXPECT().QuerySegments(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")).
Return(result, grpcErr) Return(result, grpcErr)
s.mockClient.EXPECT().Query(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")). s.mockClient.EXPECT().Query(mock.Anything, mock.AnythingOfType("*querypb.QueryRequest")).
......
...@@ -32,6 +32,8 @@ import ( ...@@ -32,6 +32,8 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/util/typeutil"
"google.golang.org/grpc/codes"
grpcStatus "google.golang.org/grpc/status" grpcStatus "google.golang.org/grpc/status"
) )
...@@ -279,14 +281,15 @@ func ReadBinary(endian binary.ByteOrder, bs []byte, receiver interface{}) error ...@@ -279,14 +281,15 @@ func ReadBinary(endian binary.ByteOrder, bs []byte, receiver interface{}) error
} }
// IsGrpcErr checks whether err is instance of grpc status error. // IsGrpcErr checks whether err is instance of grpc status error.
func IsGrpcErr(err error) bool { func IsGrpcErr(err error, targets ...codes.Code) bool {
set := typeutil.NewSet[codes.Code](targets...)
for { for {
if err == nil { if err == nil {
return false return false
} }
_, ok := grpcStatus.FromError(err) s, ok := grpcStatus.FromError(err)
if ok { if ok {
return true return set.Len() == 0 || set.Contain(s.Code())
} }
err = errors.Unwrap(err) err = errors.Unwrap(err)
} }
......
...@@ -349,6 +349,18 @@ func TestIsGrpcErr(t *testing.T) { ...@@ -349,6 +349,18 @@ func TestIsGrpcErr(t *testing.T) {
errWrap := fmt.Errorf("wrap grpc error %w", err) errWrap := fmt.Errorf("wrap grpc error %w", err)
assert.True(t, IsGrpcErr(errWrap)) assert.True(t, IsGrpcErr(errWrap))
}) })
t.Run("codes_match", func(t *testing.T) {
err := grpcStatus.Error(grpcCodes.Unavailable, "test")
errWrap := fmt.Errorf("wrap grpc error %w", err)
assert.True(t, IsGrpcErr(errWrap, grpcCodes.Unimplemented, grpcCodes.Unavailable))
})
t.Run("codes_not_match", func(t *testing.T) {
err := grpcStatus.Error(grpcCodes.Unavailable, "test")
errWrap := fmt.Errorf("wrap grpc error %w", err)
assert.False(t, IsGrpcErr(errWrap, grpcCodes.Unimplemented))
})
} }
func TestIsEmptyString(t *testing.T) { func TestIsEmptyString(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册