From 75737c65ac0115041ee1531d1555f6837df63f0b Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 31 Mar 2023 10:54:29 +0800 Subject: [PATCH] Refine error handle of QueryCoord (#23068) Signed-off-by: yah01 --- internal/querycoordv2/errors.go | 2 - .../querycoordv2/meta/resource_manager.go | 2 +- .../meta/resource_manager_test.go | 2 +- internal/querycoordv2/server.go | 12 +- internal/querycoordv2/server_test.go | 4 +- internal/querycoordv2/services.go | 144 +++++++++--------- internal/querycoordv2/services_test.go | 55 ++++--- internal/util/merr/utils.go | 11 ++ 8 files changed, 120 insertions(+), 112 deletions(-) diff --git a/internal/querycoordv2/errors.go b/internal/querycoordv2/errors.go index 78a44f06a..30f42592a 100644 --- a/internal/querycoordv2/errors.go +++ b/internal/querycoordv2/errors.go @@ -24,8 +24,6 @@ import ( ) var ( - ErrNotHealthy = errors.New("NotHealthy") - // Node Availability ErrLackSegment = errors.New("LackSegment") ErrNodeOffline = errors.New("NodeOffline") diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index c122f0277..25b11d6a4 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -135,7 +135,7 @@ func (rm *ResourceManager) AddResourceGroup(rgName string) error { } if rm.groups[rgName] != nil { - return ErrRGAlreadyExist + return nil } if len(rm.groups) >= 1024 { diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index d0b8702c7..dda7671d2 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -66,7 +66,7 @@ func (suite *ResourceManagerSuite) TestManipulateResourceGroup() { // test add duplicate rg err = suite.manager.AddResourceGroup("rg1") - suite.ErrorIs(err, ErrRGAlreadyExist) + suite.NoError(err) // test delete rg err = suite.manager.RemoveResourceGroup("rg1") suite.NoError(err) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index bd0ba7758..bb2d8db37 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -21,7 +21,6 @@ import ( "fmt" "os" "sync" - "sync/atomic" "syscall" "time" @@ -55,6 +54,7 @@ import ( "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/atomic" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -68,7 +68,7 @@ type Server struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - status atomic.Value + status atomic.Int32 etcdCli *clientv3.Client address string session *sessionutil.Session @@ -471,7 +471,11 @@ func (s *Server) Stop() error { // UpdateStateCode updates the status of the coord, including healthy, unhealthy func (s *Server) UpdateStateCode(code commonpb.StateCode) { - s.status.Store(code) + s.status.Store(int32(code)) +} + +func (s *Server) State() commonpb.StateCode { + return commonpb.StateCode(s.status.Load()) } func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { @@ -482,7 +486,7 @@ func (s *Server) GetComponentStates(ctx context.Context) (*milvuspb.ComponentSta serviceComponentInfo := &milvuspb.ComponentInfo{ // NodeID: Params.QueryCoordID, // will race with QueryCoord.Register() NodeID: nodeID, - StateCode: s.status.Load().(commonpb.StateCode), + StateCode: s.State(), } return &milvuspb.ComponentStates{ diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 36185671c..69f3c88bc 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -232,13 +232,13 @@ func (suite *ServerSuite) TestDisableActiveStandby() { suite.server, err = newQueryCoord() suite.NoError(err) - suite.Equal(commonpb.StateCode_Initializing, suite.server.status.Load().(commonpb.StateCode)) + suite.Equal(commonpb.StateCode_Initializing, suite.server.State()) suite.hackServer() err = suite.server.Start() suite.NoError(err) err = suite.server.Register() suite.NoError(err) - suite.Equal(commonpb.StateCode_Healthy, suite.server.status.Load().(commonpb.StateCode)) + suite.Equal(commonpb.StateCode_Healthy, suite.server.State()) states, err := suite.server.GetComponentStates(context.Background()) suite.NoError(err) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d012dc2f5..8aa054e42 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -61,11 +61,10 @@ var ( func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) { log.Ctx(ctx).Info("show collections request received", zap.Int64s("collections", req.GetCollectionIDs())) - if s.status.Load() != commonpb.StateCode_Healthy { - msg := "failed to show collections" - log.Warn(msg, zap.Error(ErrNotHealthy)) + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to show collections", zap.Error(err)) return &querypb.ShowCollectionsResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } defer meta.GlobalFailedLoadCache.TryExpire() @@ -134,11 +133,10 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions log.Info("show partitions request received", zap.Int64s("partitions", req.GetPartitionIDs())) - if s.status.Load() != commonpb.StateCode_Healthy { - msg := "failed to show partitions" - log.Warn(msg, zap.Error(ErrNotHealthy)) + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to show partitions", zap.Error(err)) return &querypb.ShowPartitionsResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } defer meta.GlobalFailedLoadCache.TryExpire() @@ -193,11 +191,11 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection ) metrics.QueryCoordLoadCount.WithLabelValues(metrics.TotalLabel).Inc() - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to load collection" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } // If refresh mode is ON. @@ -244,11 +242,11 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc() tr := timerecord.NewTimeRecorder("release-collection") - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to release collection" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } releaseJob := job.NewReleaseCollectionJob(ctx, @@ -288,11 +286,11 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions zap.Int64s("partitions", req.GetPartitionIDs())) metrics.QueryCoordLoadCount.WithLabelValues(metrics.TotalLabel).Inc() - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to load partitions" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordLoadCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } // If refresh mode is ON. @@ -356,11 +354,11 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart log.Info("release partitions", zap.Int64s("partitions", req.GetPartitionIDs())) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc() - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to release partitions" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } if len(req.GetPartitionIDs()) == 0 { @@ -402,11 +400,11 @@ func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti log.Info("get partition states", zap.Int64s("partitions", req.GetPartitionIDs())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to get partition states" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &querypb.GetPartitionStatesResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } @@ -470,11 +468,11 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo log.Info("get segment info", zap.Int64s("segments", req.GetSegmentIDs())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to get segment info" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &querypb.GetSegmentInfoResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } @@ -512,9 +510,9 @@ func (s *Server) SyncNewCreatedPartition(ctx context.Context, req *querypb.SyncN log.Info("received sync new created partition request") failedMsg := "failed to sync new created partition" - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(failedMsg, zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, failedMsg, ErrNotHealthy), nil + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn(failedMsg, zap.Error(err)) + return merr.Status(err), nil } syncJob := job.NewSyncNewCreatedPartitionJob(ctx, req, s.meta, s.cluster) @@ -539,11 +537,11 @@ func (s *Server) refreshCollection(ctx context.Context, collID int64) (*commonpb log := log.Ctx(ctx).With( zap.Int64("collectionID", collID), ) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to refresh collection" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } // Check that collection is fully loaded. @@ -593,11 +591,11 @@ func (s *Server) refreshPartitions(ctx context.Context, collID int64, partIDs [] zap.Int64("collectionID", collID), zap.Int64s("partitionIDs", partIDs), ) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to refresh partitions" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc() - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + return merr.Status(err), nil } // Check that all partitions are fully loaded. @@ -659,10 +657,10 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques zap.Int64s("dest", req.GetDstNodeIDs()), zap.Int64s("segments", req.GetSealedSegmentIDs())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to load balance" - log.Warn(msg, zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), nil + log.Warn(msg, zap.Error(err)) + return merr.Status(err), nil } // Verify request @@ -713,11 +711,11 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon log.Info("show configurations request received", zap.String("pattern", req.GetPattern())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to show configurations" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &internalpb.ShowConfigurationsResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } configList := make([]*commonpb.KeyValuePair, 0) @@ -744,11 +742,11 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest log.RatedDebug(60, "get metrics request received", zap.String("metricType", req.GetRequest())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to get metrics" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } @@ -792,11 +790,11 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque log.Info("get replicas request received", zap.Bool("with-shard-nodes", req.GetWithShardNodes())) - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to get replicas" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &milvuspb.GetReplicasResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } @@ -833,11 +831,11 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade ) log.Info("get shard leaders request received") - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { msg := "failed to get shard leaders" - log.Warn(msg, zap.Error(ErrNotHealthy)) + log.Warn(msg, zap.Error(err)) return &querypb.GetShardLeadersResponse{ - Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, ErrNotHealthy), + Status: merr.Status(err), }, nil } @@ -947,7 +945,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade } func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - if s.status.Load() != commonpb.StateCode_Healthy { + if err := merr.CheckHealthy(s.State()); err != nil { reason := errorutil.UnHealthReason("querycoord", paramtable.GetNodeID(), "querycoord is unhealthy") return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: []string{reason}}, nil } @@ -984,15 +982,15 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe ) log.Info("create resource group request received") - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrCreateResourceGroupFailed.Error(), zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrCreateResourceGroupFailed.Error(), ErrNotHealthy), nil + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to create resource group", zap.Error(err)) + return merr.Status(err), nil } err := s.meta.ResourceManager.AddResourceGroup(req.GetResourceGroup()) if err != nil { - log.Warn(ErrCreateResourceGroupFailed.Error(), zap.Error(err)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrCreateResourceGroupFailed.Error(), err), nil + log.Warn("failed to create resource group", zap.Error(err)) + return merr.Status(err), nil } return merr.Status(nil), nil } @@ -1003,9 +1001,9 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour ) log.Info("drop resource group request received") - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), ErrNotHealthy), nil + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to drop resource group", zap.Error(err)) + return merr.Status(err), nil } replicas := s.meta.ReplicaManager.GetByResourceGroup(req.GetResourceGroup()) @@ -1016,8 +1014,8 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour err := s.meta.ResourceManager.RemoveResourceGroup(req.GetResourceGroup()) if err != nil { - log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), err), nil + log.Warn("failed to drop resource group", zap.Error(err)) + return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to drop resource group", err), nil } return merr.Status(nil), nil } @@ -1030,9 +1028,9 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq ) log.Info("transfer node between resource group request received") - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrTransferNodeFailed.Error(), zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferNodeFailed.Error(), ErrNotHealthy), nil + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to transfer node between resource group", zap.Error(err)) + return merr.Status(err), nil } if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok { @@ -1085,9 +1083,9 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli ) log.Info("transfer replica request received") - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrTransferReplicaFailed.Error(), zap.Error(ErrNotHealthy)) - return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrTransferReplicaFailed.Error(), ErrNotHealthy), nil + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to transfer replica between resource group", zap.Error(err)) + return merr.Status(err), nil } if ok := s.meta.ResourceManager.ContainResourceGroup(req.GetSourceResourceGroup()); !ok { @@ -1157,9 +1155,9 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou resp := &milvuspb.ListResourceGroupsResponse{ Status: merr.Status(nil), } - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrListResourceGroupsFailed.Error(), zap.Error(ErrNotHealthy)) - resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrListResourceGroupsFailed.Error(), ErrNotHealthy) + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn("failed to list resource group", zap.Error(err)) + resp.Status = merr.Status(err) return resp, nil } @@ -1176,9 +1174,9 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ resp := &querypb.DescribeResourceGroupResponse{ Status: merr.Status(nil), } - if s.status.Load() != commonpb.StateCode_Healthy { - log.Warn(ErrDescribeResourceGroupFailed.Error(), zap.Error(ErrNotHealthy)) - resp.Status = utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDescribeResourceGroupFailed.Error(), ErrNotHealthy) + if err := merr.CheckHealthy(s.State()); err != nil { + log.Warn(ErrDescribeResourceGroupFailed.Error(), zap.Error(err)) + resp.Status = merr.Status(err) return resp, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index d2f6645e0..24d86c846 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -232,7 +232,7 @@ func (suite *ServiceSuite) TestShowCollections() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err = server.ShowCollections(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestShowPartitions() { @@ -303,7 +303,7 @@ func (suite *ServiceSuite) TestShowPartitions() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err := server.ShowPartitions(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestLoadCollection() { @@ -340,7 +340,7 @@ func (suite *ServiceSuite) TestLoadCollection() { } resp, err := server.LoadCollection(ctx, req) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestResourceGroup() { @@ -357,9 +357,7 @@ func (suite *ServiceSuite) TestResourceGroup() { resp, err = server.CreateResourceGroup(ctx, createRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) - suite.Contains(resp.Reason, ErrCreateResourceGroupFailed.Error()) - suite.Contains(resp.Reason, meta.ErrRGAlreadyExist.Error()) + suite.True(merr.Ok(resp)) listRG := &milvuspb.ListResourceGroupsRequest{} resp1, err := server.ListResourceGroups(ctx, listRG) @@ -434,7 +432,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() { suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.GetStatus().GetErrorCode()) // server unhealthy - server.status.Store(commonpb.StateCode_Abnormal) + server.UpdateStateCode(commonpb.StateCode_Abnormal) createRG := &milvuspb.CreateResourceGroupRequest{ ResourceGroup: "rg1", @@ -442,30 +440,30 @@ func (suite *ServiceSuite) TestResourceGroupFailed() { resp1, err := server.CreateResourceGroup(ctx, createRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp1.ErrorCode) + suite.ErrorIs(merr.Error(resp1), merr.ErrServiceNotReady) listRG := &milvuspb.ListResourceGroupsRequest{} resp2, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp2.GetStatus().GetErrorCode()) + suite.ErrorIs(merr.Error(resp2.GetStatus()), merr.ErrServiceNotReady) describeRG = &querypb.DescribeResourceGroupRequest{ ResourceGroup: "rg1", } resp3, err := server.DescribeResourceGroup(ctx, describeRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp3.GetStatus().GetErrorCode()) + suite.ErrorIs(merr.Error(resp3.GetStatus()), merr.ErrServiceNotReady) dropRG := &milvuspb.DropResourceGroupRequest{ ResourceGroup: "rg1", } resp4, err := server.DropResourceGroup(ctx, dropRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp4.ErrorCode) + suite.ErrorIs(merr.Error(resp4), merr.ErrServiceNotReady) resp5, err := server.ListResourceGroups(ctx, listRG) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp5.GetStatus().GetErrorCode()) + suite.ErrorIs(merr.Error(resp5.GetStatus()), merr.ErrServiceNotReady) } func (suite *ServiceSuite) TestTransferNode() { @@ -579,14 +577,14 @@ func (suite *ServiceSuite) TestTransferNode() { suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) // server unhealthy - server.status.Store(commonpb.StateCode_Abnormal) + server.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = server.TransferNode(ctx, &milvuspb.TransferNodeRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg1", NumNode: 3, }) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + suite.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady) } func (suite *ServiceSuite) TestTransferReplica() { @@ -703,16 +701,15 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 3) // server unhealthy - server.status.Store(commonpb.StateCode_Abnormal) + server.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: meta.DefaultResourceGroupName, TargetResourceGroup: "rg3", CollectionID: 1, NumReplica: 2, }) - suite.NoError(err) - suite.Equal(resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) + suite.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady) } func (suite *ServiceSuite) TestLoadCollectionFailed() { @@ -850,7 +847,7 @@ func (suite *ServiceSuite) TestLoadPartition() { } resp, err = server.LoadPartitions(ctx, req) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestLoadPartitionFailed() { @@ -905,7 +902,7 @@ func (suite *ServiceSuite) TestReleaseCollection() { } resp, err := server.ReleaseCollection(ctx, req) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestReleasePartition() { @@ -947,7 +944,7 @@ func (suite *ServiceSuite) TestReleasePartition() { } resp, err := server.ReleasePartitions(ctx, req) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestRefreshCollection() { @@ -1013,7 +1010,7 @@ func (suite *ServiceSuite) TestRefreshCollection() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err := server.refreshCollection(ctx, suite.collections[0]) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestRefreshPartitions() { @@ -1090,7 +1087,7 @@ func (suite *ServiceSuite) TestRefreshPartitions() { server.UpdateStateCode(commonpb.StateCode_Initializing) resp, err := server.refreshPartitions(ctx, suite.collections[0], []int64{}) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestGetPartitionStates() { @@ -1117,7 +1114,7 @@ func (suite *ServiceSuite) TestGetPartitionStates() { } resp, err := server.GetPartitionStates(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestGetSegmentInfo() { @@ -1156,7 +1153,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() { } resp, err := server.GetSegmentInfo(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestLoadBalance() { @@ -1203,7 +1200,7 @@ func (suite *ServiceSuite) TestLoadBalance() { } resp, err := server.LoadBalance(ctx, req) suite.NoError(err) - suite.Contains(resp.Reason, ErrNotHealthy.Error()) + suite.Equal(resp.GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { @@ -1402,7 +1399,7 @@ func (suite *ServiceSuite) TestShowConfigurations() { } resp, err = server.ShowConfigurations(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestGetMetrics() { @@ -1434,7 +1431,7 @@ func (suite *ServiceSuite) TestGetMetrics() { Request: string(req), }) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestGetReplicas() { @@ -1477,7 +1474,7 @@ func (suite *ServiceSuite) TestGetReplicas() { } resp, err := server.GetReplicas(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestCheckHealth() { @@ -1550,7 +1547,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() { } resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) - suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error()) + suite.Equal(resp.GetStatus().GetCode(), merr.Code(merr.ErrServiceNotReady)) } func (suite *ServiceSuite) TestGetShardLeadersFailed() { diff --git a/internal/util/merr/utils.go b/internal/util/merr/utils.go index e15fdcb06..468c59b4b 100644 --- a/internal/util/merr/utils.go +++ b/internal/util/merr/utils.go @@ -110,6 +110,17 @@ func Error(status *commonpb.Status) error { return newMilvusError(status.GetReason(), code, code&retriableFlag != 0) } +// CheckHealthy checks whether the state is healthy, +// returns nil if healthy, +// otherwise returns ErrServiceNotReady wrapped with current state +func CheckHealthy(state commonpb.StateCode) error { + if state != commonpb.StateCode_Healthy { + return WrapErrServiceNotReady(state.String()) + } + + return nil +} + // Service related func WrapErrServiceNotReady(stage string, msg ...string) error { err := errors.Wrapf(ErrServiceNotReady, "stage=%s", stage) -- GitLab