未验证 提交 90a5aa62 编写于 作者: Y yah01 提交者: GitHub

Refine errors, re-define error codes (#22501)

Signed-off-by: Nyah01 <yang.cen@zilliz.com>
上级 f9940520
...@@ -24,10 +24,10 @@ import ( ...@@ -24,10 +24,10 @@ import (
"unsafe" "unsafe"
"github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper" "github.com/milvus-io/milvus/internal/mq/msgstream/mqwrapper"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/apache/pulsar-client-go/pulsar" "github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/zap" "go.uber.org/zap"
) )
......
...@@ -30,6 +30,7 @@ import ( ...@@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil" "github.com/milvus-io/milvus/internal/util/funcutil"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
"go.uber.org/atomic" "go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -45,7 +46,6 @@ import ( ...@@ -45,7 +46,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/rootcoordpb" "github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/retry"
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
) )
......
...@@ -37,6 +37,7 @@ import ( ...@@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/hardware" "github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
...@@ -319,7 +320,7 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m ...@@ -319,7 +320,7 @@ func (s *Server) fillReplicaInfo(replica *meta.Replica, withShardNodes bool) (*m
if len(channels) == 0 { if len(channels) == 0 {
msg := "failed to get channels, collection not loaded" msg := "failed to get channels, collection not loaded"
log.Warn(msg) log.Warn(msg)
return nil, utils.WrapError(msg, meta.ErrCollectionNotFound) return nil, merr.WrapErrCollectionNotFound(replica.GetCollectionID(), msg)
} }
var segments []*meta.Segment var segments []*meta.Segment
if withShardNodes { if withShardNodes {
......
...@@ -22,6 +22,7 @@ import ( ...@@ -22,6 +22,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
. "github.com/milvus-io/milvus/internal/util/typeutil" . "github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/samber/lo" "github.com/samber/lo"
...@@ -310,7 +311,7 @@ func (m *CollectionManager) UpdateCollection(collection *Collection) error { ...@@ -310,7 +311,7 @@ func (m *CollectionManager) UpdateCollection(collection *Collection) error {
_, ok := m.collections[collection.GetCollectionID()] _, ok := m.collections[collection.GetCollectionID()]
if !ok { if !ok {
return ErrCollectionNotFound return merr.WrapErrCollectionNotFound(collection.GetCollectionID())
} }
return m.putCollection(collection, true) return m.putCollection(collection, true)
...@@ -362,7 +363,7 @@ func (m *CollectionManager) UpdatePartition(partition *Partition) error { ...@@ -362,7 +363,7 @@ func (m *CollectionManager) UpdatePartition(partition *Partition) error {
_, ok := m.partitions[partition.GetPartitionID()] _, ok := m.partitions[partition.GetPartitionID()]
if !ok { if !ok {
return ErrPartitionNotFound return merr.WrapErrPartitionNotFound(partition.GetPartitionID())
} }
return m.putPartition([]*Partition{partition}, true) return m.putPartition([]*Partition{partition}, true)
......
...@@ -36,6 +36,7 @@ import ( ...@@ -36,6 +36,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/merr"
. "github.com/milvus-io/milvus/internal/util/typeutil" . "github.com/milvus-io/milvus/internal/util/typeutil"
) )
...@@ -180,7 +181,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID ...@@ -180,7 +181,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
segmentInfo, ok := resp.SegmentInfo[segmentID] segmentInfo, ok := resp.SegmentInfo[segmentID]
if !ok || len(segmentInfo.GetIndexInfos()) == 0 { if !ok || len(segmentInfo.GetIndexInfos()) == 0 {
return nil, WrapErrIndexNotExist(segmentID) return nil, merr.WrapErrIndexNotFound()
} }
indexes := make([]*querypb.FieldIndexInfo, 0) indexes := make([]*querypb.FieldIndexInfo, 0)
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package meta
import (
"fmt"
"github.com/cockroachdb/errors"
)
var (
// Read errors
ErrCollectionNotFound = errors.New("CollectionNotFound")
ErrPartitionNotFound = errors.New("PartitionNotFound")
ErrReplicaNotFound = errors.New("ReplicaNotFound")
// Store errors
ErrStoreCollectionFailed = errors.New("StoreCollectionFailed")
ErrStoreReplicaFailed = errors.New("StoreReplicaFailed")
// Index errors
ErrIndexNotExist = errors.New("IndexNotExist")
)
func WrapErrIndexNotExist(segmentID int64) error {
return fmt.Errorf("%w(segmentID=%d)", ErrIndexNotExist, segmentID)
}
...@@ -23,6 +23,7 @@ import ( ...@@ -23,6 +23,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/typeutil" "github.com/milvus-io/milvus/internal/util/typeutil"
. "github.com/milvus-io/milvus/internal/util/typeutil" . "github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap" "go.uber.org/zap"
...@@ -282,7 +283,7 @@ func (m *ReplicaManager) AddNode(replicaID UniqueID, nodes ...UniqueID) error { ...@@ -282,7 +283,7 @@ func (m *ReplicaManager) AddNode(replicaID UniqueID, nodes ...UniqueID) error {
replica, ok := m.replicas[replicaID] replica, ok := m.replicas[replicaID]
if !ok { if !ok {
return ErrReplicaNotFound return merr.WrapErrReplicaNotFound(replicaID)
} }
replica = replica.Clone() replica = replica.Clone()
...@@ -296,7 +297,7 @@ func (m *ReplicaManager) RemoveNode(replicaID UniqueID, nodes ...UniqueID) error ...@@ -296,7 +297,7 @@ func (m *ReplicaManager) RemoveNode(replicaID UniqueID, nodes ...UniqueID) error
replica, ok := m.replicas[replicaID] replica, ok := m.replicas[replicaID]
if !ok { if !ok {
return ErrReplicaNotFound return merr.WrapErrReplicaNotFound(replicaID)
} }
replica = replica.Clone() replica = replica.Clone()
......
...@@ -34,6 +34,7 @@ import ( ...@@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/errorutil" "github.com/milvus-io/milvus/internal/util/errorutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/timerecord" "github.com/milvus-io/milvus/internal/util/timerecord"
...@@ -852,8 +853,9 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade ...@@ -852,8 +853,9 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade
channels := s.targetMgr.GetDmChannelsByCollection(req.GetCollectionID(), meta.CurrentTarget) channels := s.targetMgr.GetDmChannelsByCollection(req.GetCollectionID(), meta.CurrentTarget)
if len(channels) == 0 { if len(channels) == 0 {
msg := "failed to get channels" msg := "failed to get channels"
log.Warn(msg, zap.Error(meta.ErrCollectionNotFound)) err := merr.WrapErrCollectionNotFound(req.GetCollectionID())
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg, meta.ErrCollectionNotFound) log.Warn(msg, zap.Error(err))
resp.Status = utils.WrapStatus(commonpb.ErrorCode_MetaFailed, msg, err)
return resp, nil return resp, nil
} }
......
...@@ -23,8 +23,8 @@ import ( ...@@ -23,8 +23,8 @@ import (
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/retry"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
......
...@@ -33,7 +33,7 @@ import ( ...@@ -33,7 +33,7 @@ import (
"golang.org/x/exp/mmap" "golang.org/x/exp/mmap"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/errutil" "github.com/milvus-io/milvus/internal/util/merr"
) )
// LocalChunkManager is responsible for read and write local file. // LocalChunkManager is responsible for read and write local file.
...@@ -107,7 +107,7 @@ func (lcm *LocalChunkManager) MultiWrite(ctx context.Context, contents map[strin ...@@ -107,7 +107,7 @@ func (lcm *LocalChunkManager) MultiWrite(ctx context.Context, contents map[strin
for filePath, content := range contents { for filePath, content := range contents {
err := lcm.Write(ctx, filePath, content) err := lcm.Write(ctx, filePath, content)
if err != nil { if err != nil {
el = errutil.Combine(el, errors.Wrapf(err, "write %s failed", filePath)) el = merr.Combine(el, errors.Wrapf(err, "write %s failed", filePath))
} }
} }
return el return el
...@@ -145,7 +145,7 @@ func (lcm *LocalChunkManager) MultiRead(ctx context.Context, filePaths []string) ...@@ -145,7 +145,7 @@ func (lcm *LocalChunkManager) MultiRead(ctx context.Context, filePaths []string)
for i, filePath := range filePaths { for i, filePath := range filePaths {
content, err := lcm.Read(ctx, filePath) content, err := lcm.Read(ctx, filePath)
if err != nil { if err != nil {
el = errutil.Combine(el, errors.Wrapf(err, "failed to read %s", filePath)) el = merr.Combine(el, errors.Wrapf(err, "failed to read %s", filePath))
} }
results[i] = content results[i] = content
} }
...@@ -252,7 +252,7 @@ func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []strin ...@@ -252,7 +252,7 @@ func (lcm *LocalChunkManager) MultiRemove(ctx context.Context, filePaths []strin
for _, filePath := range filePaths { for _, filePath := range filePaths {
err := lcm.Remove(ctx, filePath) err := lcm.Remove(ctx, filePath)
if err != nil { if err != nil {
el = errutil.Combine(err, errors.Wrapf(err, "failed to remove %s", filePath)) el = merr.Combine(err, errors.Wrapf(err, "failed to remove %s", filePath))
} }
} }
return el return el
......
...@@ -29,7 +29,7 @@ import ( ...@@ -29,7 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/storage/gcp" "github.com/milvus-io/milvus/internal/storage/gcp"
"github.com/milvus-io/milvus/internal/util/errutil" "github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/retry"
"github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials" "github.com/minio/minio-go/v7/pkg/credentials"
...@@ -205,7 +205,7 @@ func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, kvs map[string][]b ...@@ -205,7 +205,7 @@ func (mcm *MinioChunkManager) MultiWrite(ctx context.Context, kvs map[string][]b
for key, value := range kvs { for key, value := range kvs {
err := mcm.Write(ctx, key, value) err := mcm.Write(ctx, key, value)
if err != nil { if err != nil {
el = errutil.Combine(el, errors.Wrapf(err, "failed to write %s", key)) el = merr.Combine(el, errors.Wrapf(err, "failed to write %s", key))
} }
} }
return el return el
...@@ -274,7 +274,7 @@ func (mcm *MinioChunkManager) MultiRead(ctx context.Context, keys []string) ([][ ...@@ -274,7 +274,7 @@ func (mcm *MinioChunkManager) MultiRead(ctx context.Context, keys []string) ([][
for _, key := range keys { for _, key := range keys {
objectValue, err := mcm.Read(ctx, key) objectValue, err := mcm.Read(ctx, key)
if err != nil { if err != nil {
el = errutil.Combine(el, errors.Wrapf(err, "failed to read %s", key)) el = merr.Combine(el, errors.Wrapf(err, "failed to read %s", key))
} }
objectsValues = append(objectsValues, objectValue) objectsValues = append(objectsValues, objectValue)
} }
...@@ -347,7 +347,7 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er ...@@ -347,7 +347,7 @@ func (mcm *MinioChunkManager) MultiRemove(ctx context.Context, keys []string) er
for _, key := range keys { for _, key := range keys {
err := mcm.Remove(ctx, key) err := mcm.Remove(ctx, key)
if err != nil { if err != nil {
el = errutil.Combine(el, errors.Wrapf(err, "failed to remove %s", key)) el = merr.Combine(el, errors.Wrapf(err, "failed to remove %s", key))
} }
} }
return el return el
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// errutil package provides utility for errors handling.
package errutil
import (
"github.com/cockroachdb/errors"
"github.com/samber/lo"
)
type multiErrors struct {
errs []error
}
func (e multiErrors) Unwrap() error {
if len(e.errs) <= 1 {
return nil
}
return multiErrors{
errs: e.errs[1:],
}
}
func (e multiErrors) Error() string {
final := e.errs[0]
for i := 1; i < len(e.errs); i++ {
final = errors.Wrap(e.errs[i], final.Error())
}
return final.Error()
}
func (e multiErrors) Is(err error) bool {
for _, item := range e.errs {
if errors.Is(item, err) {
return true
}
}
return false
}
func Combine(errs ...error) error {
errs = lo.Filter(errs, func(err error, _ int) bool { return err != nil })
if len(errs) == 0 {
return nil
}
return multiErrors{
errs,
}
}
...@@ -22,9 +22,7 @@ import ( ...@@ -22,9 +22,7 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil"
"net" "net"
"net/http"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
...@@ -32,15 +30,12 @@ import ( ...@@ -32,15 +30,12 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"go.uber.org/zap"
grpcStatus "google.golang.org/grpc/status" grpcStatus "google.golang.org/grpc/status"
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/retry"
) )
// CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh // CheckGrpcReady wait for context timeout, or wait 100ms then send nil to targetCh
...@@ -89,49 +84,6 @@ const ( ...@@ -89,49 +84,6 @@ const (
PulsarMaxMessageSizeKey = "maxMessageSize" PulsarMaxMessageSizeKey = "maxMessageSize"
) )
// GetPulsarConfig get pulsar configuration using pulsar admin api
func GetPulsarConfig(protocol, ip, port, url string, args ...int64) (map[string]interface{}, error) {
var resp *http.Response
var err error
getResp := func() error {
log.Debug("function util", zap.String("url", protocol+"://"+ip+":"+port+url))
resp, err = http.Get(protocol + "://" + ip + ":" + port + url)
return err
}
var attempt uint = 10
var interval = time.Second
if len(args) > 0 && args[0] > 0 {
attempt = uint(args[0])
}
if len(args) > 1 && args[1] > 0 {
interval = time.Duration(args[1])
}
err = retry.Do(context.TODO(), getResp, retry.Attempts(attempt), retry.Sleep(interval))
if err != nil {
log.Debug("failed to get config", zap.String("error", err.Error()))
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
log.Debug("get config", zap.String("config", string(body)))
if err != nil {
return nil, err
}
ret := make(map[string]interface{})
err = json.Unmarshal(body, &ret)
if err != nil {
return nil, err
}
return ret, nil
}
// GetAttrByKeyFromRepeatedKV return the value corresponding to key in kv pair // GetAttrByKeyFromRepeatedKV return the value corresponding to key in kv pair
func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (string, error) { func GetAttrByKeyFromRepeatedKV(key string, kvs []*commonpb.KeyValuePair) (string, error) {
for _, kv := range kvs { for _, kv := range kvs {
......
...@@ -21,14 +21,12 @@ import ( ...@@ -21,14 +21,12 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http"
"strconv" "strconv"
"testing" "testing"
"time" "time"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/jarcoal/httpmock"
"github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb" "github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
...@@ -82,43 +80,6 @@ func Test_ParseIndexParamsMap(t *testing.T) { ...@@ -82,43 +80,6 @@ func Test_ParseIndexParamsMap(t *testing.T) {
assert.NotEqual(t, err, nil) assert.NotEqual(t, err, nil)
} }
func TestGetPulsarConfig(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
runtimeConfig := make(map[string]interface{})
runtimeConfig[PulsarMaxMessageSizeKey] = strconv.FormatInt(5*1024*1024, 10)
protocol := "http"
ip := "pulsar"
port := "18080"
url := "/admin/v2/brokers/configuration/runtime"
httpmock.RegisterResponder("GET", protocol+"://"+ip+":"+port+url,
func(req *http.Request) (*http.Response, error) {
return httpmock.NewJsonResponse(200, runtimeConfig)
},
)
ret, err := GetPulsarConfig(protocol, ip, port, url)
assert.Equal(t, nil, err)
assert.Equal(t, len(ret), len(runtimeConfig))
assert.Equal(t, len(ret), 1)
for key, value := range ret {
assert.Equal(t, fmt.Sprintf("%v", value), fmt.Sprintf("%v", runtimeConfig[key]))
}
}
func TestGetPulsarConfig_Error(t *testing.T) {
protocol := "http"
ip := "pulsar"
port := "17777"
url := "/admin/v2/brokers/configuration/runtime"
ret, err := GetPulsarConfig(protocol, ip, port, url, 1, 1)
assert.NotNil(t, err)
assert.Nil(t, ret)
}
func TestGetAttrByKeyFromRepeatedKV(t *testing.T) { func TestGetAttrByKeyFromRepeatedKV(t *testing.T) {
kvs := []*commonpb.KeyValuePair{ kvs := []*commonpb.KeyValuePair{
{Key: "Key1", Value: "Value1"}, {Key: "Key1", Value: "Value1"},
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// errutil package provides utility for errors handling.
package merr
import (
"context"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
const (
rootCoordBits = (iota + 1) << 16
dataCoordBits
queryCoordBits
dataNodeBits
queryNodeBits
indexNodeBits
proxyBits
standaloneBits
embededBits
retriableFlag = 1 << 20
rootReasonCodeMask = (1 << 16) - 1
CanceledCode int32 = 10000
TimeoutCode int32 = 10001
)
// Define leaf errors here,
// WARN: take care to add new error,
// check whehter you can use the erorrs below before adding a new one.
// Name: Err + related prefix + error name
var (
// Service related
ErrServiceNotReady = newMilvusError("service not ready", 1, true) // This indicates the service is still in init
ErrServiceUnavailable = newMilvusError("service unavailable", 2, true)
ErrServiceMemoryLimitExceeded = newMilvusError("memory limit exceeded", 3, false)
ErrServiceRequestLimitExceeded = newMilvusError("request limit exceeded", 4, true)
// Collection related
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 202, false)
ErrPartitionNotLoaded = newMilvusError("partition not loaded", 203, false)
// ResourceGroup related
ErrResourceGroupNotFound = newMilvusError("resource group not found", 300, false)
// Replica related
ErrReplicaNotFound = newMilvusError("replica not found", 400, false)
// Channel related
ErrChannelNotFound = newMilvusError("channel not found", 500, false)
// Segment related
ErrSegmentNotFound = newMilvusError("segment not found", 600, false)
ErrSegmentNotLoaded = newMilvusError("segment not loaded", 601, false)
ErrSegmentLack = newMilvusError("segment lacks", 602, false)
// Index related
ErrIndexNotFound = newMilvusError("index not found", 700, false)
// Node related
ErrNodeNotFound = newMilvusError("node not found", 901, false)
ErrNodeOffline = newMilvusError("node offline", 902, false)
ErrNodeLack = newMilvusError("node lacks", 903, false)
// IO related
ErrIoKeyNotFound = newMilvusError("key not found", 1000, false)
ErrIoFailed = newMilvusError("IO failed", 1001, false)
// Parameter related
ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false)
// Do NOT export this,
// never allow programmer using this, keep only for converting unknown error to milvusError
errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false)
)
func maskComponentBits(code int32) int32 {
switch paramtable.GetRole() {
case typeutil.RootCoordRole:
return code | rootCoordBits
case typeutil.DataCoordRole:
return code | dataCoordBits
case typeutil.QueryCoordRole:
return code | queryCoordBits
case typeutil.DataNodeRole:
return code | dataNodeBits
case typeutil.QueryNodeRole:
return code | queryNodeBits
case typeutil.IndexNodeRole:
return code | indexNodeBits
case typeutil.ProxyRole:
return code | proxyBits
case typeutil.StandaloneRole:
return code | standaloneBits
case typeutil.EmbeddedRole:
return code | embededBits
}
return code
}
type milvusError struct {
msg string
errCode int32
}
func newMilvusError(msg string, code int32, retriable bool) milvusError {
if retriable {
code |= retriableFlag
}
return milvusError{
msg: msg,
errCode: code,
}
}
func (e milvusError) code() int32 {
return maskComponentBits(e.errCode)
}
func (e milvusError) Error() string {
return e.msg
}
func (e milvusError) Is(err error) bool {
cause := errors.Cause(err)
if cause, ok := cause.(milvusError); ok {
return e.errCode == cause.errCode
}
return false
}
// Code returns the error code of the given error,
// WARN: DO NOT use this for now
func Code(err error) int32 {
if err == nil {
return 0
}
cause := errors.Cause(err)
switch cause := cause.(type) {
case milvusError:
return cause.code()
default:
if errors.Is(cause, context.Canceled) {
return CanceledCode
} else if errors.Is(cause, context.DeadlineExceeded) {
return TimeoutCode
} else {
return errUnexpected.code()
}
}
}
type multiErrors struct {
errs []error
}
func (e multiErrors) Unwrap() error {
if len(e.errs) <= 1 {
return nil
}
return multiErrors{
errs: e.errs[1:],
}
}
func (e multiErrors) Error() string {
final := e.errs[0]
for i := 1; i < len(e.errs); i++ {
final = errors.Wrap(e.errs[i], final.Error())
}
return final.Error()
}
func (e multiErrors) Is(err error) bool {
for _, item := range e.errs {
if errors.Is(item, err) {
return true
}
}
return false
}
func Combine(errs ...error) error {
errs = lo.Filter(errs, func(err error, _ int) bool { return err != nil })
if len(errs) == 0 {
return nil
}
return multiErrors{
errs,
}
}
...@@ -14,12 +14,14 @@ ...@@ -14,12 +14,14 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package errutil package merr
import ( import (
"context"
"testing" "testing"
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
) )
...@@ -27,6 +29,69 @@ type ErrSuite struct { ...@@ -27,6 +29,69 @@ type ErrSuite struct {
suite.Suite suite.Suite
} }
func (s *ErrSuite) SetupSuite() {
paramtable.Init()
}
func (s *ErrSuite) TestCode() {
err := WrapErrCollectionNotFound(1)
errors.Wrap(err, "failed to get collection")
s.ErrorIs(err, ErrCollectionNotFound)
s.Equal(Code(ErrCollectionNotFound), Code(err))
s.Equal(TimeoutCode, Code(context.DeadlineExceeded))
s.Equal(CanceledCode, Code(context.Canceled))
s.Equal(errUnexpected.errCode, Code(errUnexpected))
sameCodeErr := newMilvusError("new error", ErrCollectionNotFound.errCode, false)
s.True(sameCodeErr.Is(ErrCollectionNotFound))
}
func (s *ErrSuite) TestWrap() {
// Service related
s.ErrorIs(WrapErrServiceNotReady("init", "test init..."), ErrServiceNotReady)
s.ErrorIs(WrapErrServiceUnavailable("test", "test init"), ErrServiceUnavailable)
s.ErrorIs(WrapErrServiceMemoryLimitExceeded(110, 100, "MLE"), ErrServiceMemoryLimitExceeded)
s.ErrorIs(WrapErrServiceRequestLimitExceeded(100, "too many requests"), ErrServiceRequestLimitExceeded)
// Collection related
s.ErrorIs(WrapErrCollectionNotFound("test_collection", "failed to get collection"), ErrCollectionNotFound)
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to query"), ErrCollectionNotLoaded)
// Partition related
s.ErrorIs(WrapErrPartitionNotFound("test_Partition", "failed to get Partition"), ErrPartitionNotFound)
s.ErrorIs(WrapErrPartitionNotLoaded("test_Partition", "failed to query"), ErrPartitionNotLoaded)
// ResourceGroup related
s.ErrorIs(WrapErrResourceGroupNotFound("test_ResourceGroup", "failed to get ResourceGroup"), ErrResourceGroupNotFound)
// Replica related
s.ErrorIs(WrapErrReplicaNotFound(1, "failed to get Replica"), ErrReplicaNotFound)
// Channel related
s.ErrorIs(WrapErrChannelNotFound("test_Channel", "failed to get Channel"), ErrChannelNotFound)
// Segment related
s.ErrorIs(WrapErrSegmentNotFound(1, "failed to get Segment"), ErrSegmentNotFound)
s.ErrorIs(WrapErrSegmentNotLoaded(1, "failed to query"), ErrSegmentNotLoaded)
s.ErrorIs(WrapErrSegmentLack(1, "lack of segment"), ErrSegmentLack)
// Index related
s.ErrorIs(WrapErrIndexNotFound("failed to get Index"), ErrIndexNotFound)
// Node related
s.ErrorIs(WrapErrNodeNotFound(1, "failed to get node"), ErrNodeNotFound)
s.ErrorIs(WrapErrNodeOffline(1, "failed to access node"), ErrNodeOffline)
s.ErrorIs(WrapErrNodeLack(3, 1, "need more nodes"), ErrNodeLack)
// IO related
s.ErrorIs(WrapErrIoKeyNotFound("test_key", "failed to read"), ErrIoKeyNotFound)
s.ErrorIs(WrapErrIoFailed("test_key", "failed to read"), ErrIoFailed)
// Parameter related
s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid)
s.ErrorIs(WrapErrParameterInvalidRange(1, 1<<16, 0, "topk should be in range"), ErrParameterInvalid)
}
func (s *ErrSuite) TestCombine() { func (s *ErrSuite) TestCombine() {
var ( var (
errFirst = errors.New("first") errFirst = errors.New("first")
......
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// errutil package provides utility for errors handling.
package merr
import (
"strings"
"github.com/cockroachdb/errors"
)
func IsRetriable(err error) bool {
return Code(err)&retriableFlag != 0
}
// Service related
func WrapErrServiceNotReady(stage string, msg ...string) error {
err := errors.Wrapf(ErrServiceNotReady, "stage=%s", stage)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrServiceUnavailable(reason string, msg ...string) error {
err := errors.Wrap(ErrServiceUnavailable, reason)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrServiceMemoryLimitExceeded(predict, limit float32, msg ...string) error {
err := errors.Wrapf(ErrServiceMemoryLimitExceeded, "predict=%v, limit=%v", predict, limit)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrServiceRequestLimitExceeded(limit int32, msg ...string) error {
err := errors.Wrapf(ErrServiceRequestLimitExceeded, "limit=%v", limit)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Collection related
func WrapErrCollectionNotFound(collection any, msg ...string) error {
err := wrapWithField(ErrCollectionNotFound, "collection", collection)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrCollectionNotLoaded(collection any, msg ...string) error {
err := wrapWithField(ErrCollectionNotLoaded, "collection", collection)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Partition related
func WrapErrPartitionNotFound(partition any, msg ...string) error {
err := wrapWithField(ErrPartitionNotFound, "partition", partition)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrPartitionNotLoaded(partition any, msg ...string) error {
err := wrapWithField(ErrPartitionNotLoaded, "partition", partition)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// ResourceGroup related
func WrapErrResourceGroupNotFound(rg any, msg ...string) error {
err := wrapWithField(ErrResourceGroupNotFound, "rg", rg)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Replica related
func WrapErrReplicaNotFound(id int64, msg ...string) error {
err := wrapWithField(ErrReplicaNotFound, "replica", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Channel related
func WrapErrChannelNotFound(name string, msg ...string) error {
err := wrapWithField(ErrChannelNotFound, "channel", name)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Segment related
func WrapErrSegmentNotFound(id int64, msg ...string) error {
err := wrapWithField(ErrSegmentNotFound, "segment", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrSegmentNotLoaded(id int64, msg ...string) error {
err := wrapWithField(ErrSegmentNotLoaded, "segment", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrSegmentLack(id int64, msg ...string) error {
err := wrapWithField(ErrSegmentLack, "segment", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Index related
func WrapErrIndexNotFound(msg ...string) error {
err := error(ErrIndexNotFound)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Node related
func WrapErrNodeNotFound(id int64, msg ...string) error {
err := wrapWithField(ErrNodeNotFound, "node", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrNodeOffline(id int64, msg ...string) error {
err := wrapWithField(ErrNodeOffline, "node", id)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
err := errors.Wrapf(ErrNodeLack, "expectedNum=%d, actualNum=%d", expectedNum, actualNum)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// IO related
func WrapErrIoKeyNotFound(key string, msg ...string) error {
err := errors.Wrapf(ErrIoKeyNotFound, "key=%s", key)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrIoFailed(key string, msg ...string) error {
err := errors.Wrapf(ErrIoFailed, "key=%s", key)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// Parameter related
func WrapErrParameterInvalid[T any](expected, actual T, msg ...string) error {
err := errors.Wrapf(ErrParameterInvalid, "expected=%v, actual=%v", expected, actual)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string) error {
err := errors.Wrapf(ErrParameterInvalid, "expected in (%v, %v), actual=%v", lower, upper, actual)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func wrapWithField(err error, name string, value any) error {
return errors.Wrapf(err, "%s=%v", name, value)
}
...@@ -19,7 +19,7 @@ import ( ...@@ -19,7 +19,7 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/errutil" "github.com/milvus-io/milvus/internal/util/merr"
) )
// Do will run function with retry mechanism. // Do will run function with retry mechanism.
...@@ -43,7 +43,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { ...@@ -43,7 +43,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
} }
err = errors.Wrapf(err, "attempt #%d", i) err = errors.Wrapf(err, "attempt #%d", i)
el = errutil.Combine(el, err) el = merr.Combine(el, err)
if !IsRecoverable(err) { if !IsRecoverable(err) {
return el return el
...@@ -52,7 +52,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { ...@@ -52,7 +52,7 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error {
select { select {
case <-time.After(c.sleep): case <-time.After(c.sleep):
case <-ctx.Done(): case <-ctx.Done():
el = errutil.Combine(el, errors.Wrapf(ctx.Err(), "context done during sleep after run#%d", i)) el = merr.Combine(el, errors.Wrapf(ctx.Err(), "context done during sleep after run#%d", i))
return el return el
} }
...@@ -73,7 +73,7 @@ var errUnrecoverable = errors.New("unrecoverable error") ...@@ -73,7 +73,7 @@ var errUnrecoverable = errors.New("unrecoverable error")
// Unrecoverable method wrap an error to unrecoverableError. This will make retry // Unrecoverable method wrap an error to unrecoverableError. This will make retry
// quick return. // quick return.
func Unrecoverable(err error) error { func Unrecoverable(err error) error {
return errutil.Combine(err, errUnrecoverable) return merr.Combine(err, errUnrecoverable)
} }
// IsRecoverable is used to judge whether the error is wrapped by unrecoverableError. // IsRecoverable is used to judge whether the error is wrapped by unrecoverableError.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册