未验证 提交 c551de8f 编写于 作者: C cai.zhang 提交者: GitHub

Catch errors on gloabl meta cache (#20023)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 e7d58b4f
......@@ -785,6 +785,8 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
// GetIndexInfos gets the index file paths from IndexCoord.
func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
log.Debug("IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.GetIndexName()), zap.Int64s("segIDs", req.GetSegmentIDs()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
return &indexpb.GetIndexInfoResponse{
......@@ -840,6 +842,7 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf
// DescribeIndex describe the index info of the collection.
func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log.Debug("IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
return &indexpb.DescribeIndexResponse{
......
......@@ -86,3 +86,7 @@ func ErrUnauthenticated() error {
func ErrProxyNotReady() error {
return status.Errorf(codes.Unavailable, "internal: Milvus Proxy is not ready yet. please wait")
}
func ErrPartitionNotExist(partitionName string) error {
return fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
}
......@@ -407,7 +407,7 @@ func (m *MetaCache) GetPartitionInfo(ctx context.Context, collectionName string,
log.Debug("proxy", zap.Any("GetPartitionID:partitions after update", partitions), zap.Any("collectionName", collectionName))
partInfo, ok = m.collInfo[collectionName].partInfo[partitionName]
if !ok {
return nil, fmt.Errorf("partitionID of partitionName:%s can not be find", partitionName)
return nil, ErrPartitionNotExist(partitionName)
}
}
metrics.ProxyCacheStatsCounter.WithLabelValues(strconv.FormatInt(Params.ProxyCfg.GetNodeID(), 10), "GetPartitionInfo", metrics.CacheHitLabel).Inc()
......
......@@ -11,13 +11,15 @@ type getCollectionIDFunc func(ctx context.Context, collectionName string) (typeu
type getCollectionSchemaFunc func(ctx context.Context, collectionName string) (*schemapb.CollectionSchema, error)
type getCollectionInfoFunc func(ctx context.Context, collectionName string) (*collectionInfo, error)
type getUserRoleFunc func(username string) []string
type getPartitionIDFunc func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error)
type mockCache struct {
Cache
getIDFunc getCollectionIDFunc
getSchemaFunc getCollectionSchemaFunc
getInfoFunc getCollectionInfoFunc
getUserRoleFunc getUserRoleFunc
getIDFunc getCollectionIDFunc
getSchemaFunc getCollectionSchemaFunc
getInfoFunc getCollectionInfoFunc
getUserRoleFunc getUserRoleFunc
getPartitionIDFunc getPartitionIDFunc
}
func (m *mockCache) GetCollectionID(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
......@@ -44,6 +46,13 @@ func (m *mockCache) GetCollectionInfo(ctx context.Context, collectionName string
func (m *mockCache) RemoveCollection(ctx context.Context, collectionName string) {
}
func (m *mockCache) GetPartitionID(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
if m.getPartitionIDFunc != nil {
return m.getPartitionIDFunc(ctx, collectionName, partitionName)
}
return 0, nil
}
func (m *mockCache) GetUserRole(username string) []string {
if m.getUserRoleFunc != nil {
return m.getUserRoleFunc(username)
......@@ -63,6 +72,10 @@ func (m *mockCache) setGetInfoFunc(f getCollectionInfoFunc) {
m.getInfoFunc = f
}
func (m *mockCache) setGetPartitionIDFunc(f getPartitionIDFunc) {
m.getPartitionIDFunc = f
}
func newMockCache() *mockCache {
return &mockCache{}
}
......@@ -857,8 +857,17 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error {
return err
}
collID, _ := globalMetaCache.GetCollectionID(ctx, dpt.GetCollectionName())
partID, _ := globalMetaCache.GetPartitionID(ctx, dpt.GetCollectionName(), dpt.GetPartitionName())
collID, err := globalMetaCache.GetCollectionID(ctx, dpt.GetCollectionName())
if err != nil {
return err
}
partID, err := globalMetaCache.GetPartitionID(ctx, dpt.GetCollectionName(), dpt.GetPartitionName())
if err != nil {
if err.Error() == ErrPartitionNotExist(dpt.GetPartitionName()).Error() {
return nil
}
return err
}
collLoaded, err := isCollectionLoaded(ctx, dpt.queryCoord, []int64{collID})
if err != nil {
......
......@@ -374,7 +374,10 @@ func (dit *describeIndexTask) PreExecute(ctx context.Context) error {
return err
}
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
collID, err := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
if err != nil {
return err
}
dit.collectionID = collID
return nil
}
......@@ -495,7 +498,10 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
dit.IndexName = Params.CommonCfg.DefaultIndexName
}
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
collID, err := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
if err != nil {
return err
}
dit.collectionID = collID
loaded, err := isCollectionLoaded(ctx, dit.queryCoord, []int64{collID})
......
......@@ -21,6 +21,8 @@ import (
"errors"
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
......@@ -96,7 +98,6 @@ func TestDropIndexTask_PreExecute(t *testing.T) {
indexName := ""
Params.Init()
rc := newMockRootCoord()
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
......@@ -110,20 +111,11 @@ func TestDropIndexTask_PreExecute(t *testing.T) {
ctx := context.Background()
qc.updateState(commonpb.StateCode_Healthy)
shardMgr := newShardClientMgr()
// failed to get collection id.
_ = InitMetaCache(ctx, rc, qc, shardMgr)
rc.DescribeCollectionFunc = func(ctx context.Context, request *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) {
return &milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Schema: newTestSchema(),
CollectionID: collectionID,
CollectionName: request.CollectionName,
}, nil
}
mockCache := newMockCache()
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return collectionID, nil
})
globalMetaCache = mockCache
dit := dropIndexTask{
ctx: ctx,
......@@ -150,6 +142,21 @@ func TestDropIndexTask_PreExecute(t *testing.T) {
assert.NoError(t, err)
})
t.Run("get collectionID error", func(t *testing.T) {
mockCache := newMockCache()
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 0, errors.New("error")
})
globalMetaCache = mockCache
err := dit.PreExecute(ctx)
assert.Error(t, err)
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return collectionID, nil
})
globalMetaCache = mockCache
t.Run("coll has been loaded", func(t *testing.T) {
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
......
......@@ -1104,6 +1104,11 @@ func TestDropPartitionTask(t *testing.T) {
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock))
qc.updateState(commonpb.StateCode_Healthy)
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 1, nil
})
globalMetaCache = mockCache
task := &dropPartitionTask{
Condition: NewTaskCondition(ctx),
......@@ -1140,6 +1145,50 @@ func TestDropPartitionTask(t *testing.T) {
task.PartitionName = "#0xc0de"
err = task.PreExecute(ctx)
assert.NotNil(t, err)
t.Run("get collectionID error", func(t *testing.T) {
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 1, nil
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 0, errors.New("error")
})
globalMetaCache = mockCache
task.PartitionName = "partition1"
err = task.PreExecute(ctx)
assert.Error(t, err)
})
t.Run("partition not exist", func(t *testing.T) {
task.PartitionName = "partition2"
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 0, ErrPartitionNotExist(task.PartitionName)
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 1, nil
})
globalMetaCache = mockCache
err = task.PreExecute(ctx)
assert.NoError(t, err)
})
t.Run("get partition error", func(t *testing.T) {
task.PartitionName = "partition3"
mockCache := newMockCache()
mockCache.setGetPartitionIDFunc(func(ctx context.Context, collectionName string, partitionName string) (typeutil.UniqueID, error) {
return 0, errors.New("error")
})
mockCache.setGetIDFunc(func(ctx context.Context, collectionName string) (typeutil.UniqueID, error) {
return 1, nil
})
globalMetaCache = mockCache
err = task.PreExecute(ctx)
assert.Error(t, err)
})
}
func TestHasPartitionTask(t *testing.T) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册