未验证 提交 94c15a49 编写于 作者: C cai.zhang 提交者: GitHub

Can't drop loaded partition (#19938)

Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
Signed-off-by: Ncai.zhang <cai.zhang@zilliz.com>
上级 a4fb3cc3
......@@ -1191,6 +1191,7 @@ func (node *Proxy) DropPartition(ctx context.Context, request *milvuspb.DropPart
Condition: NewTaskCondition(ctx),
DropPartitionRequest: request,
rootCoord: node.rootCoord,
queryCoord: node.queryCoord,
result: nil,
}
......
......@@ -46,6 +46,12 @@ func SetQueryCoordShowCollectionsFunc(f queryCoordShowCollectionsFuncType) Query
}
}
func SetQueryCoordShowPartitionsFunc(f queryCoordShowPartitionsFuncType) QueryCoordMockOption {
return func(mock *QueryCoordMock) {
mock.showPartitionsFunc = f
}
}
func withValidShardLeaders() QueryCoordMockOption {
return func(mock *QueryCoordMock) {
mock.validShardLeaders = true
......
......@@ -800,9 +800,10 @@ func (cpt *createPartitionTask) PostExecute(ctx context.Context) error {
type dropPartitionTask struct {
Condition
*milvuspb.DropPartitionRequest
ctx context.Context
rootCoord types.RootCoord
result *commonpb.Status
ctx context.Context
rootCoord types.RootCoord
queryCoord types.QueryCoord
result *commonpb.Status
}
func (dpt *dropPartitionTask) TraceCtx() context.Context {
......@@ -856,6 +857,23 @@ func (dpt *dropPartitionTask) PreExecute(ctx context.Context) error {
return err
}
collID, _ := globalMetaCache.GetCollectionID(ctx, dpt.GetCollectionName())
partID, _ := globalMetaCache.GetPartitionID(ctx, dpt.GetCollectionName(), dpt.GetPartitionName())
collLoaded, err := isCollectionLoaded(ctx, dpt.queryCoord, []int64{collID})
if err != nil {
return err
}
if collLoaded {
loaded, err := isPartitionLoaded(ctx, dpt.queryCoord, collID, []int64{partID})
if err != nil {
return err
}
if loaded {
return errors.New("partition cannot be dropped, partition is loaded, please release it first")
}
}
return nil
}
......
......@@ -30,7 +30,6 @@ import (
"github.com/milvus-io/milvus/internal/common"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/funcutil"
......@@ -499,24 +498,11 @@ func (dit *dropIndexTask) PreExecute(ctx context.Context) error {
collID, _ := globalMetaCache.GetCollectionID(ctx, dit.CollectionName)
dit.collectionID = collID
// get all loading collections
resp, err := dit.queryCoord.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
CollectionIDs: nil,
})
loaded, err := isCollectionLoaded(ctx, dit.queryCoord, []int64{collID})
if err != nil {
return err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return errors.New(resp.Status.Reason)
}
loaded := false
for _, loadedCollID := range resp.GetCollectionIDs() {
if collID == loadedCollID {
loaded = true
break
}
}
if loaded {
return errors.New("index cannot be dropped, collection is loaded, please release it first")
}
......
......@@ -27,6 +27,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/mocks"
......@@ -1091,6 +1093,18 @@ func TestDropPartitionTask(t *testing.T) {
collectionName := prefix + funcutil.GenRandomStr()
partitionName := prefix + funcutil.GenRandomStr()
showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
PartitionIDs: []int64{},
}, nil
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock))
qc.updateState(commonpb.StateCode_Healthy)
task := &dropPartitionTask{
Condition: NewTaskCondition(ctx),
DropPartitionRequest: &milvuspb.DropPartitionRequest{
......@@ -1103,9 +1117,10 @@ func TestDropPartitionTask(t *testing.T) {
CollectionName: collectionName,
PartitionName: partitionName,
},
ctx: ctx,
rootCoord: rc,
result: nil,
ctx: ctx,
rootCoord: rc,
queryCoord: qc,
result: nil,
}
task.PreExecute(ctx)
......
......@@ -24,6 +24,9 @@ import (
"strings"
"time"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"golang.org/x/crypto/bcrypt"
"google.golang.org/grpc/metadata"
......@@ -832,3 +835,54 @@ func validateIndexName(indexName string) error {
}
return nil
}
func isCollectionLoaded(ctx context.Context, qc types.QueryCoord, collIDs []int64) (bool, error) {
// get all loading collections
resp, err := qc.ShowCollections(ctx, &querypb.ShowCollectionsRequest{
CollectionIDs: nil,
})
if err != nil {
return false, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, errors.New(resp.Status.Reason)
}
loaded := false
LOOP:
for _, loadedCollID := range resp.GetCollectionIDs() {
for _, collID := range collIDs {
if collID == loadedCollID {
loaded = true
break LOOP
}
}
}
return loaded, nil
}
func isPartitionLoaded(ctx context.Context, qc types.QueryCoord, collIDs int64, partIDs []int64) (bool, error) {
// get all loading collections
resp, err := qc.ShowPartitions(ctx, &querypb.ShowPartitionsRequest{
CollectionID: collIDs,
PartitionIDs: nil,
})
if err != nil {
return false, err
}
if resp.Status.ErrorCode != commonpb.ErrorCode_Success {
return false, errors.New(resp.Status.Reason)
}
loaded := false
LOOP:
for _, loadedPartID := range resp.GetPartitionIDs() {
for _, partID := range partIDs {
if partID == loadedPartID {
loaded = true
break LOOP
}
}
}
return loaded, nil
}
......@@ -18,25 +18,25 @@ package proxy
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/crypto"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/metadata"
)
func TestValidateCollectionName(t *testing.T) {
......@@ -809,3 +809,120 @@ func TestValidateTravelTimestamp(t *testing.T) {
})
}
}
func Test_isCollectionIsLoaded(t *testing.T) {
ctx := context.Background()
t.Run("normal", func(t *testing.T) {
collID := int64(1)
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
CollectionIDs: []int64{collID, 10, 100},
}, nil
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isCollectionLoaded(ctx, qc, []int64{collID})
assert.NoError(t, err)
assert.True(t, loaded)
})
t.Run("error", func(t *testing.T) {
collID := int64(1)
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
CollectionIDs: []int64{collID},
}, errors.New("error")
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isCollectionLoaded(ctx, qc, []int64{collID})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("fail", func(t *testing.T) {
collID := int64(1)
showCollectionMock := func(ctx context.Context, request *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
return &querypb.ShowCollectionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
CollectionIDs: []int64{collID},
}, nil
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowCollectionsFunc(showCollectionMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isCollectionLoaded(ctx, qc, []int64{collID})
assert.Error(t, err)
assert.False(t, loaded)
})
}
func Test_isPartitionIsLoaded(t *testing.T) {
ctx := context.Background()
t.Run("normal", func(t *testing.T) {
collID := int64(1)
partID := int64(2)
showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
PartitionIDs: []int64{partID},
}, nil
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
assert.NoError(t, err)
assert.True(t, loaded)
})
t.Run("error", func(t *testing.T) {
collID := int64(1)
partID := int64(2)
showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
PartitionIDs: []int64{partID},
}, errors.New("error")
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
assert.Error(t, err)
assert.False(t, loaded)
})
t.Run("fail", func(t *testing.T) {
collID := int64(1)
partID := int64(2)
showPartitionsMock := func(ctx context.Context, request *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
return &querypb.ShowPartitionsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
PartitionIDs: []int64{partID},
}, nil
}
qc := NewQueryCoordMock(withValidShardLeaders(), SetQueryCoordShowPartitionsFunc(showPartitionsMock))
qc.updateState(commonpb.StateCode_Healthy)
loaded, err := isPartitionLoaded(ctx, qc, collID, []int64{partID})
assert.Error(t, err)
assert.False(t, loaded)
})
}
......@@ -664,7 +664,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
"""
# 1. initialize with data
partition_num = 1
collection_w = self.init_collection_general(prefix, True, 1000, partition_num)[0]
collection_w = self.init_collection_general(prefix, True, 1000, partition_num, is_index=True)[0]
# 2. delete partitions
log.info("test_search_partition_deleted: deleting a partition")
par = collection_w.partitions
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册