未验证 提交 1a4732bb 编写于 作者: Y yah01 提交者: GitHub

Use new errors to handle load failures cache (#22672)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 8a4a6405
......@@ -238,14 +238,14 @@ func (s *Server) fillMetricsWithNodes(topo *metricsinfo.QueryClusterTopology, no
continue
}
if metric.resp.Status.ErrorCode != commonpb.ErrorCode_Success {
if metric.resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("invalid metrics of query node was found",
zap.Any("error_code", metric.resp.Status.ErrorCode),
zap.Any("error_reason", metric.resp.Status.Reason))
zap.Any("error_code", metric.resp.GetStatus().GetErrorCode()),
zap.Any("error_reason", metric.resp.GetStatus().GetReason()))
topo.ConnectedNodes = append(topo.ConnectedNodes, metricsinfo.QueryNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
HasError: true,
ErrorReason: metric.resp.Status.Reason,
ErrorReason: metric.resp.GetStatus().GetReason(),
Name: metric.resp.ComponentName,
ID: int64(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
},
......
......@@ -103,8 +103,8 @@ func (broker *CoordinatorBroker) GetPartitions(ctx context.Context, collectionID
return nil, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(resp.Status.Reason)
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = errors.New(resp.GetStatus().GetReason())
log.Warn("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
......@@ -129,8 +129,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfo(ctx context.Context, collection
return nil, nil, err
}
if recoveryInfo.Status.ErrorCode != commonpb.ErrorCode_Success {
err = errors.New(recoveryInfo.Status.Reason)
if recoveryInfo.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
err = errors.New(recoveryInfo.GetStatus().GetReason())
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
return nil, nil, err
}
......
......@@ -22,9 +22,8 @@ import (
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
. "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/util/merr"
)
const expireTime = 24 * time.Hour
......@@ -38,60 +37,70 @@ type failInfo struct {
}
type FailedLoadCache struct {
mu sync.RWMutex
records map[UniqueID]map[commonpb.ErrorCode]*failInfo
mu sync.RWMutex
// CollectionID, ErrorCode -> error
records map[int64]map[int32]*failInfo
}
func NewFailedLoadCache() *FailedLoadCache {
return &FailedLoadCache{
records: make(map[UniqueID]map[commonpb.ErrorCode]*failInfo),
records: make(map[int64]map[int32]*failInfo),
}
}
func (l *FailedLoadCache) Get(collectionID UniqueID) *commonpb.Status {
func (l *FailedLoadCache) Get(collectionID int64) error {
l.mu.RLock()
defer l.mu.RUnlock()
status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}
if _, ok := l.records[collectionID]; !ok {
return status
return nil
}
if len(l.records[collectionID]) == 0 {
return status
return nil
}
var max = 0
for code, info := range l.records[collectionID] {
var (
max = 0
err error
)
for _, info := range l.records[collectionID] {
if info.count > max {
max = info.count
status.ErrorCode = code
status.Reason = info.err.Error()
err = info.err
}
}
log.Warn("FailedLoadCache hits failed record", zap.Int64("collectionID", collectionID),
zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason()))
return status
log.Warn("FailedLoadCache hits failed record",
zap.Int64("collectionID", collectionID),
zap.Error(err),
)
return err
}
func (l *FailedLoadCache) Put(collectionID UniqueID, errCode commonpb.ErrorCode, err error) {
if errCode == commonpb.ErrorCode_Success {
func (l *FailedLoadCache) Put(collectionID int64, err error) {
if err == nil {
return
}
code := merr.Code(err)
l.mu.Lock()
defer l.mu.Unlock()
if _, ok := l.records[collectionID]; !ok {
l.records[collectionID] = make(map[commonpb.ErrorCode]*failInfo)
l.records[collectionID] = make(map[int32]*failInfo)
}
if _, ok := l.records[collectionID][errCode]; !ok {
l.records[collectionID][errCode] = &failInfo{}
if _, ok := l.records[collectionID][code]; !ok {
l.records[collectionID][code] = &failInfo{}
}
l.records[collectionID][errCode].count++
l.records[collectionID][errCode].err = err
l.records[collectionID][errCode].lastTime = time.Now()
log.Warn("FailedLoadCache put failed record", zap.Int64("collectionID", collectionID),
zap.String("errCode", errCode.String()), zap.Error(err))
l.records[collectionID][code].count++
l.records[collectionID][code].err = err
l.records[collectionID][code].lastTime = time.Now()
log.Warn("FailedLoadCache put failed record",
zap.Int64("collectionID", collectionID),
zap.Error(err),
)
}
func (l *FailedLoadCache) Remove(collectionID UniqueID) {
func (l *FailedLoadCache) Remove(collectionID int64) {
l.mu.Lock()
defer l.mu.Unlock()
delete(l.records, collectionID)
......
......@@ -17,39 +17,38 @@
package meta
import (
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/util/merr"
)
func TestFailedLoadCache(t *testing.T) {
GlobalFailedLoadCache = NewFailedLoadCache()
colID := int64(0)
errCode := commonpb.ErrorCode_InsufficientMemoryToLoad
mockErr := fmt.Errorf("mock insufficient memory reason")
mockErr := merr.WrapErrServiceMemoryLimitExceeded(0, 0)
GlobalFailedLoadCache.Put(colID, commonpb.ErrorCode_Success, nil)
res := GlobalFailedLoadCache.Get(colID)
assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode())
GlobalFailedLoadCache.Put(colID, nil)
err := GlobalFailedLoadCache.Get(colID)
assert.NoError(t, err)
GlobalFailedLoadCache.Put(colID, errCode, mockErr)
res = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, errCode, res.GetErrorCode())
GlobalFailedLoadCache.Put(colID, mockErr)
err = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, merr.Code(merr.ErrServiceMemoryLimitExceeded), merr.Code(err))
GlobalFailedLoadCache.Remove(colID)
res = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode())
err = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, commonpb.ErrorCode_Success, merr.Status(err).ErrorCode)
GlobalFailedLoadCache.Put(colID, errCode, mockErr)
GlobalFailedLoadCache.Put(colID, mockErr)
GlobalFailedLoadCache.mu.Lock()
GlobalFailedLoadCache.records[colID][errCode].lastTime = time.Now().Add(-expireTime * 2)
GlobalFailedLoadCache.records[colID][merr.Code(mockErr)].lastTime = time.Now().Add(-expireTime * 2)
GlobalFailedLoadCache.mu.Unlock()
GlobalFailedLoadCache.TryExpire()
res = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, commonpb.ErrorCode_Success, res.GetErrorCode())
err = GlobalFailedLoadCache.Get(colID)
assert.Equal(t, commonpb.ErrorCode_Success, merr.Status(err).ErrorCode)
}
......@@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
......@@ -171,7 +172,7 @@ func (suite *ServerSuite) TestRecoverFailed() {
func (suite *ServerSuite) TestNodeUp() {
newNode := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 100)
newNode.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{}, nil)
newNode.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Status(nil)}, nil)
err := newNode.Start()
suite.NoError(err)
defer newNode.Stop()
......@@ -254,7 +255,7 @@ func (suite *ServerSuite) TestEnableActiveStandby() {
mockDataCoord := coordMocks.NewDataCoord(suite.T())
mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Status: successStatus,
Status: merr.Status(nil),
Schema: &schemapb.CollectionSchema{},
}, nil).Maybe()
for _, collection := range suite.collections {
......@@ -266,7 +267,7 @@ func (suite *ServerSuite) TestEnableActiveStandby() {
CollectionID: collection,
}
mockRootCoord.EXPECT().ShowPartitionsInternal(mock.Anything, req).Return(&milvuspb.ShowPartitionsResponse{
Status: successStatus,
Status: merr.Status(nil),
PartitionIDs: suite.partitions[collection],
}, nil).Maybe()
}
......@@ -414,7 +415,7 @@ func (suite *ServerSuite) expectGetRecoverInfoByMockDataCoord(collection int64,
})
}
dataCoord.EXPECT().GetRecoveryInfo(mock.Anything, getRecoveryInfoRequest).Maybe().Return(&datapb.GetRecoveryInfoResponse{
Status: successStatus,
Status: merr.Status(nil),
Channels: vChannels,
Binlogs: segmentBinlogs,
}, nil)
......
......@@ -46,8 +46,6 @@ import (
)
var (
successStatus = utils.WrapStatus(commonpb.ErrorCode_Success, "")
ErrCreateResourceGroupFailed = errors.New("failed to create resource group")
ErrDropResourceGroupFailed = errors.New("failed to drop resource group")
ErrAddNodeToRGFailed = errors.New("failed to add node to resource group")
......@@ -86,7 +84,7 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
collections := collectionSet.Collect()
resp := &querypb.ShowCollectionsResponse{
Status: successStatus,
Status: &commonpb.Status{},
CollectionIDs: make([]int64, 0, len(collectionSet)),
InMemoryPercentages: make([]int64, 0, len(collectionSet)),
QueryServiceAvailable: make([]bool, 0, len(collectionSet)),
......@@ -101,17 +99,20 @@ func (s *Server) ShowCollections(ctx context.Context, req *querypb.ShowCollectio
// ignore it
continue
}
status := meta.GlobalFailedLoadCache.Get(collectionID)
if status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("show collection failed", zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason()))
err := meta.GlobalFailedLoadCache.Get(collectionID)
if err != nil {
log.Warn("show collection failed", zap.Error(err))
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad
return &querypb.ShowCollectionsResponse{
Status: status,
}, nil
}
err := fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID)
err = fmt.Errorf("collection %d has not been loaded to memory or load failed", collectionID)
log.Warn("show collection failed", zap.Error(err))
return &querypb.ShowCollectionsResponse{
Status: utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
Status: merr.Status(err),
}, nil
}
resp.CollectionIDs = append(resp.CollectionIDs, collectionID)
......@@ -186,9 +187,10 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
}
if isReleased {
status := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
if status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("show collection failed", zap.String("errCode", status.GetErrorCode().String()), zap.String("reason", status.GetReason()))
err := meta.GlobalFailedLoadCache.Get(req.GetCollectionID())
if err != nil {
status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_InsufficientMemoryToLoad
return &querypb.ShowPartitionsResponse{
Status: status,
}, nil
......@@ -201,7 +203,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
}
return &querypb.ShowPartitionsResponse{
Status: successStatus,
Status: merr.Status(nil),
PartitionIDs: partitions,
InMemoryPercentages: percentages,
}, nil
......@@ -258,7 +260,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
......@@ -298,7 +300,7 @@ func (s *Server) ReleaseCollection(ctx context.Context, req *querypb.ReleaseColl
metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds()))
meta.GlobalFailedLoadCache.Remove(req.GetCollectionID())
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
......@@ -352,7 +354,7 @@ func (s *Server) LoadPartitions(ctx context.Context, req *querypb.LoadPartitions
}
metrics.QueryCoordLoadCount.WithLabelValues(metrics.SuccessLabel).Inc()
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) checkResourceGroup(collectionID int64, resourceGroups []string) error {
......@@ -415,7 +417,7 @@ func (s *Server) ReleasePartitions(ctx context.Context, req *querypb.ReleasePart
metrics.QueryCoordReleaseLatency.WithLabelValues().Observe(float64(tr.ElapseSpan().Milliseconds()))
meta.GlobalFailedLoadCache.Remove(req.GetCollectionID())
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
......@@ -481,7 +483,7 @@ func (s *Server) GetPartitionStates(ctx context.Context, req *querypb.GetPartiti
}
return &querypb.GetPartitionStatesResponse{
Status: successStatus,
Status: merr.Status(nil),
PartitionDescriptions: states,
}, nil
}
......@@ -521,7 +523,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfo
}
return &querypb.GetSegmentInfoResponse{
Status: successStatus,
Status: merr.Status(nil),
Infos: infos,
}, nil
}
......@@ -703,7 +705,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
log.Warn(msg, zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil
}
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
......@@ -751,7 +753,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
}
resp := &milvuspb.GetMetricsResponse{
Status: successStatus,
Status: merr.Status(nil),
ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole,
paramtable.GetNodeID()),
}
......@@ -799,7 +801,7 @@ func (s *Server) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasReque
}
resp := &milvuspb.GetReplicasResponse{
Status: successStatus,
Status: merr.Status(nil),
Replicas: make([]*milvuspb.ReplicaInfo, 0),
}
......@@ -840,7 +842,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
}
resp := &querypb.GetShardLeadersResponse{
Status: successStatus,
Status: merr.Status(nil),
}
if s.meta.CollectionManager.GetLoadPercentage(req.GetCollectionID()) < 100 {
......@@ -991,7 +993,7 @@ func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateRe
log.Warn(ErrCreateResourceGroupFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrCreateResourceGroupFailed.Error(), err), nil
}
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResourceGroupRequest) (*commonpb.Status, error) {
......@@ -1016,7 +1018,7 @@ func (s *Server) DropResourceGroup(ctx context.Context, req *milvuspb.DropResour
log.Warn(ErrDropResourceGroupFailed.Error(), zap.Error(err))
return utils.WrapStatus(commonpb.ErrorCode_UnexpectedError, ErrDropResourceGroupFailed.Error(), err), nil
}
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeRequest) (*commonpb.Status, error) {
......@@ -1071,7 +1073,7 @@ func (s *Server) TransferNode(ctx context.Context, req *milvuspb.TransferNodeReq
utils.AddNodesToCollectionsInRG(s.meta, req.GetTargetResourceGroup(), nodes...)
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferReplicaRequest) (*commonpb.Status, error) {
......@@ -1128,7 +1130,7 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli
return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, ErrTransferReplicaFailed.Error(), err), nil
}
return successStatus, nil
return merr.Status(nil), nil
}
func (s *Server) transferReplica(targetRG string, replicas []*meta.Replica) error {
......@@ -1152,7 +1154,7 @@ func (s *Server) ListResourceGroups(ctx context.Context, req *milvuspb.ListResou
log.Info("list resource group request received")
resp := &milvuspb.ListResourceGroupsResponse{
Status: successStatus,
Status: merr.Status(nil),
}
if s.status.Load() != commonpb.StateCode_Healthy {
log.Warn(ErrListResourceGroupsFailed.Error(), zap.Error(ErrNotHealthy))
......@@ -1171,7 +1173,7 @@ func (s *Server) DescribeResourceGroup(ctx context.Context, req *querypb.Describ
log.Info("describe resource group request received")
resp := &querypb.DescribeResourceGroupResponse{
Status: successStatus,
Status: merr.Status(nil),
}
if s.status.Load() != commonpb.StateCode_Healthy {
log.Warn(ErrDescribeResourceGroupFailed.Error(), zap.Error(ErrNotHealthy))
......
......@@ -19,7 +19,6 @@ package querycoordv2
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
......@@ -41,6 +40,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
......@@ -199,7 +199,7 @@ func (suite *ServiceSuite) TestShowCollections() {
req := &querypb.ShowCollectionsRequest{}
resp, err := server.ShowCollections(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.CollectionIDs, collectionNum)
for _, collection := range suite.collections {
suite.Contains(resp.CollectionIDs, collection)
......@@ -210,7 +210,7 @@ func (suite *ServiceSuite) TestShowCollections() {
req.CollectionIDs = []int64{collection}
resp, err = server.ShowCollections(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.CollectionIDs, 1)
suite.Equal(collection, resp.CollectionIDs[0])
......@@ -218,7 +218,7 @@ func (suite *ServiceSuite) TestShowCollections() {
colBak := suite.meta.CollectionManager.GetCollection(collection)
err = suite.meta.CollectionManager.RemoveCollection(collection)
suite.NoError(err)
meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason"))
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
resp, err = server.ShowCollections(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode())
......@@ -230,7 +230,7 @@ func (suite *ServiceSuite) TestShowCollections() {
server.UpdateStateCode(commonpb.StateCode_Initializing)
resp, err = server.ShowCollections(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestShowPartitions() {
......@@ -248,7 +248,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
}
resp, err := server.ShowPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.PartitionIDs, partitionNum)
for _, partition := range partitions {
suite.Contains(resp.PartitionIDs, partition)
......@@ -261,7 +261,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
}
resp, err = server.ShowPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.PartitionIDs, 1)
for _, partition := range partitions[0:1] {
suite.Contains(resp.PartitionIDs, partition)
......@@ -272,7 +272,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
colBak := suite.meta.CollectionManager.GetCollection(collection)
err = suite.meta.CollectionManager.RemoveCollection(collection)
suite.NoError(err)
meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason"))
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
resp, err = server.ShowPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode())
......@@ -284,7 +284,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
parBak := suite.meta.CollectionManager.GetPartition(partitionID)
err = suite.meta.CollectionManager.RemovePartition(partitionID)
suite.NoError(err)
meta.GlobalFailedLoadCache.Put(collection, commonpb.ErrorCode_InsufficientMemoryToLoad, fmt.Errorf("mock insufficient memory reason"))
meta.GlobalFailedLoadCache.Put(collection, merr.WrapErrServiceMemoryLimitExceeded(100, 10))
resp, err = server.ShowPartitions(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_InsufficientMemoryToLoad, resp.GetStatus().GetErrorCode())
......@@ -301,7 +301,7 @@ func (suite *ServiceSuite) TestShowPartitions() {
server.UpdateStateCode(commonpb.StateCode_Initializing)
resp, err := server.ShowPartitions(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestLoadCollection() {
......@@ -363,7 +363,7 @@ func (suite *ServiceSuite) TestResourceGroup() {
listRG := &milvuspb.ListResourceGroupsRequest{}
resp1, err := server.ListResourceGroups(ctx, listRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp1.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp1.GetStatus().GetErrorCode())
suite.Len(resp1.ResourceGroups, 2)
server.nodeMgr.Add(session.NewNodeInfo(1011, "localhost"))
......@@ -398,7 +398,7 @@ func (suite *ServiceSuite) TestResourceGroup() {
}
resp2, err := server.DescribeResourceGroup(ctx, describeRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp2.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp2.GetStatus().GetErrorCode())
suite.Equal("rg11", resp2.GetResourceGroup().GetName())
suite.Equal(int32(2), resp2.GetResourceGroup().GetCapacity())
suite.Equal(int32(2), resp2.GetResourceGroup().GetNumAvailableNode())
......@@ -416,7 +416,7 @@ func (suite *ServiceSuite) TestResourceGroup() {
resp4, err := server.ListResourceGroups(ctx, listRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp4.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp4.GetStatus().GetErrorCode())
suite.Len(resp4.GetResourceGroups(), 3)
}
......@@ -430,7 +430,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() {
}
resp, err := server.DescribeResourceGroup(ctx, describeRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.GetStatus().GetErrorCode())
// server unhealthy
server.status.Store(commonpb.StateCode_Abnormal)
......@@ -446,14 +446,14 @@ func (suite *ServiceSuite) TestResourceGroupFailed() {
listRG := &milvuspb.ListResourceGroupsRequest{}
resp2, err := server.ListResourceGroups(ctx, listRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp2.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp2.GetStatus().GetErrorCode())
describeRG = &querypb.DescribeResourceGroupRequest{
ResourceGroup: "rg1",
}
resp3, err := server.DescribeResourceGroup(ctx, describeRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp3.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp3.GetStatus().GetErrorCode())
dropRG := &milvuspb.DropResourceGroupRequest{
ResourceGroup: "rg1",
......@@ -464,7 +464,7 @@ func (suite *ServiceSuite) TestResourceGroupFailed() {
resp5, err := server.ListResourceGroups(ctx, listRG)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp5.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp5.GetStatus().GetErrorCode())
}
func (suite *ServiceSuite) TestTransferNode() {
......@@ -1106,7 +1106,7 @@ func (suite *ServiceSuite) TestGetPartitionStates() {
}
resp, err := server.GetPartitionStates(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.PartitionDescriptions, len(suite.partitions[collection]))
}
......@@ -1117,7 +1117,7 @@ func (suite *ServiceSuite) TestGetPartitionStates() {
}
resp, err := server.GetPartitionStates(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestGetSegmentInfo() {
......@@ -1133,7 +1133,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() {
}
resp, err := server.GetSegmentInfo(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.assertSegments(collection, resp.GetInfos())
}
......@@ -1145,7 +1145,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() {
}
resp, err := server.GetSegmentInfo(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.assertSegments(collection, resp.GetInfos())
}
......@@ -1156,7 +1156,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo() {
}
resp, err := server.GetSegmentInfo(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestLoadBalance() {
......@@ -1392,7 +1392,7 @@ func (suite *ServiceSuite) TestShowConfigurations() {
}
resp, err := server.ShowConfigurations(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.Configuations, 1)
suite.Equal("querycoord.port", resp.Configuations[0].Key)
......@@ -1403,7 +1403,7 @@ func (suite *ServiceSuite) TestShowConfigurations() {
}
resp, err = server.ShowConfigurations(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestGetMetrics() {
......@@ -1412,7 +1412,7 @@ func (suite *ServiceSuite) TestGetMetrics() {
for _, node := range suite.nodes {
suite.cluster.EXPECT().GetMetrics(ctx, node, mock.Anything).Return(&milvuspb.GetMetricsResponse{
Status: successStatus,
Status: merr.Status(nil),
ComponentName: "QueryNode",
}, nil)
}
......@@ -1426,7 +1426,7 @@ func (suite *ServiceSuite) TestGetMetrics() {
Request: string(req),
})
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
// Test when server is not healthy
server.UpdateStateCode(commonpb.StateCode_Initializing)
......@@ -1435,7 +1435,7 @@ func (suite *ServiceSuite) TestGetMetrics() {
Request: string(req),
})
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestGetReplicas() {
......@@ -1450,7 +1450,7 @@ func (suite *ServiceSuite) TestGetReplicas() {
}
resp, err := server.GetReplicas(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.EqualValues(suite.replicaNumber[collection], len(resp.Replicas))
}
......@@ -1467,7 +1467,7 @@ func (suite *ServiceSuite) TestGetReplicas() {
}
resp, err := server.GetReplicas(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.EqualValues(suite.replicaNumber[collection], len(resp.Replicas))
}
......@@ -1478,7 +1478,7 @@ func (suite *ServiceSuite) TestGetReplicas() {
}
resp, err := server.GetReplicas(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestCheckHealth() {
......@@ -1537,7 +1537,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() {
suite.fetchHeartbeats(time.Now())
resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
suite.Len(resp.Shards, len(suite.channels[collection]))
for _, shard := range resp.Shards {
suite.Len(shard.NodeIds, int(suite.replicaNumber[collection]))
......@@ -1551,7 +1551,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() {
}
resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Contains(resp.Status.Reason, ErrNotHealthy.Error())
suite.Contains(resp.GetStatus().GetReason(), ErrNotHealthy.Error())
}
func (suite *ServiceSuite) TestGetShardLeadersFailed() {
......@@ -1573,7 +1573,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
}
resp, err := server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode())
for _, node := range suite.nodes {
suite.nodeMgr.Add(session.NewNodeInfo(node, "localhost"))
}
......@@ -1582,7 +1582,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
suite.fetchHeartbeats(time.Now().Add(-Params.QueryCoordCfg.HeartbeatAvailableInterval.GetAsDuration(time.Millisecond) - 1))
resp, err = server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode())
// Segment not fully loaded
for _, node := range suite.nodes {
......@@ -1594,7 +1594,7 @@ func (suite *ServiceSuite) TestGetShardLeadersFailed() {
suite.fetchHeartbeats(time.Now())
resp, err = server.GetShardLeaders(ctx, req)
suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode)
suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.GetStatus().GetErrorCode())
}
}
......
......@@ -18,7 +18,6 @@ package task
import (
"context"
"fmt"
"sync"
"time"
......@@ -31,6 +30,7 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/atomic"
"go.uber.org/zap"
......@@ -190,9 +190,10 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log.Warn("failed to load segment, it may be a false failure", zap.Error(err))
return
}
if status.ErrorCode == commonpb.ErrorCode_InsufficientMemoryToLoad {
log.Warn("insufficient memory to load segment", zap.String("err", status.GetReason()))
task.SetErr(fmt.Errorf("%w, err:%s", ErrInsufficientMemory, status.GetReason()))
err = merr.Error(status)
if errors.Is(err, merr.ErrServiceMemoryLimitExceeded) {
log.Warn("insufficient memory to load segment", zap.Error(err))
task.SetErr(err)
task.Cancel()
return
}
......
......@@ -24,7 +24,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
......@@ -661,13 +660,7 @@ func (scheduler *taskScheduler) RemoveByNode(node int64) {
}
func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
var errCode commonpb.ErrorCode
if errors.Is(task.Err(), ErrInsufficientMemory) {
errCode = commonpb.ErrorCode_InsufficientMemoryToLoad
} else {
errCode = commonpb.ErrorCode_UnexpectedError
}
meta.GlobalFailedLoadCache.Put(task.collectionID, errCode, task.Err())
meta.GlobalFailedLoadCache.Put(task.collectionID, task.Err())
}
func (scheduler *taskScheduler) remove(task Task) {
......
......@@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
......@@ -189,8 +190,7 @@ func TestTask_loadSegmentsTask(t *testing.T) {
task.req.Infos[0].SegmentSize *= 2
}
err = task.Execute(ctx)
assert.Error(t, err)
assert.Contains(t, err.Error(), "OOM")
assert.ErrorIs(t, err, merr.ErrServiceMemoryLimitExceeded)
})
factory := node.loader.factory
......
......@@ -49,6 +49,7 @@ import (
"github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/tsoutil"
......@@ -947,14 +948,16 @@ func (loader *segmentLoader) checkSegmentSize(collectionID UniqueID, segmentLoad
zap.Uint64("diskUsageAfterLoad", toMB(usedLocalSizeAfterLoad)))
if memLoadingUsage > uint64(float64(totalMem)*Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()) {
return fmt.Errorf("%w, load segment failed, OOM if load, collectionID = %d, maxSegmentSize = %v MB, concurrency = %d, usedMemAfterLoad = %v MB, totalMem = %v MB, thresholdFactor = %f",
ErrInsufficientMemory,
collectionID,
toMB(maxSegmentSize),
concurrency,
toMB(usedMemAfterLoad),
toMB(totalMem),
Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat())
err := merr.WrapErrServiceMemoryLimitExceeded(float32(usedMemAfterLoad), float32(totalMem), "failed to load segment, no enough memory")
log.Warn("load segment failed, OOM if load",
zap.Int64("collectionID", collectionID),
zap.Uint64("maxSegmentSize", toMB(maxSegmentSize)),
zap.Int("concurrency", concurrency),
zap.Uint64("usedMemAfterLoad", toMB(usedMemAfterLoad)),
zap.Uint64("totalMem", toMB(totalMem)),
zap.Float64("thresholdFactor", Params.QueryNodeCfg.OverloadedMemoryThresholdPercentage.GetAsFloat()),
)
return err
}
if usedLocalSizeAfterLoad > uint64(Params.QueryNodeCfg.DiskCapacityLimit.GetAsFloat()*Params.QueryNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册