diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 0688fd427058f55e5e9bbbd79bec528d426f9e35..9f90cd1adcbbe09d4286c6f0b556dbc668804333 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -4514,7 +4514,7 @@ func getErrResponse(err error, method string) *commonpb.Status { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, + ErrorCode: commonpb.ErrorCode_IllegalArgument, Reason: err.Error(), } } @@ -4525,13 +4525,6 @@ func (node *Proxy) DropResourceGroup(ctx context.Context, request *milvuspb.Drop } method := "DropResourceGroup" - if err := ValidateResourceGroupName(request.GetResourceGroup()); err != nil { - log.Warn("DropResourceGroup failed", - zap.Error(err), - ) - return getErrResponse(err, method), nil - } - ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-DropResourceGroup") defer sp.End() tr := timerecord.NewTimeRecorder(method) diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index be1f7283b96f3ebb03755c870f5cd4c6244797b6..f168b93da21f9d36e332665b9cc4c0d76047dfab 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -301,7 +301,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { ResourceGroup: "...", }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) }) t.Run("drop resource group", func(t *testing.T) { @@ -309,7 +309,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { ResourceGroup: "...", }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_Success) }) t.Run("transfer node", func(t *testing.T) { @@ -319,7 +319,7 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { NumNode: 1, }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) }) t.Run("transfer replica", func(t *testing.T) { @@ -330,6 +330,6 @@ func TestProxy_InvalidResourceGroupName(t *testing.T) { CollectionName: "collection1", }) assert.NoError(t, err) - assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_UnexpectedError) + assert.Equal(t, resp.ErrorCode, commonpb.ErrorCode_IllegalArgument) }) } diff --git a/internal/proxy/util.go b/internal/proxy/util.go index a32182dee3d42489be0cd930c39f0f2c0596e717..51d87b3b39188706f51beb8cdbb1feb31b1018eb 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -117,7 +117,7 @@ func validateCollectionNameOrAlias(entity, entityType string) error { func ValidateResourceGroupName(entity string) error { if entity == "" { - return fmt.Errorf("resource group name %s should not be empty", entity) + return errors.New("resource group name couldn't be empty") } invalidMsg := fmt.Sprintf("Invalid resource group name %s.", entity) diff --git a/internal/querycoordv2/job/job.go b/internal/querycoordv2/job/job.go index f8ce198e51f397aa5531a844012b939994754060..f0ba14c2d36efb839c332aaccc442fe1eda67043 100644 --- a/internal/querycoordv2/job/job.go +++ b/internal/querycoordv2/job/job.go @@ -172,12 +172,6 @@ func (job *LoadCollectionJob) PreExecute() error { return ErrCollectionLoaded } - if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) { - msg := "no enough nodes to create replicas" - log.Warn(msg) - return utils.WrapError(msg, ErrNoEnoughNode) - } - return nil } @@ -387,12 +381,6 @@ func (job *LoadPartitionJob) PreExecute() error { return ErrCollectionLoaded } - if len(job.nodeMgr.GetAll()) < int(job.req.GetReplicaNumber()) { - msg := "no enough nodes to create replicas" - log.Warn(msg) - return utils.WrapError(msg, ErrNoEnoughNode) - } - return nil } diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index 94d5a870cc47f76ebc7b46dfc5a6f9dc7597d207..6ba6dd26f791d052488d4b7457587cfba323cb60 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -343,7 +343,7 @@ func (suite *JobSuite) TestLoadCollectionWithReplicas() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorIs(err, ErrNoEnoughNode) + suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) } } @@ -605,7 +605,7 @@ func (suite *JobSuite) TestLoadPartitionWithReplicas() { ) suite.scheduler.Add(job) err := job.Wait() - suite.ErrorIs(err, ErrNoEnoughNode) + suite.ErrorContains(err, meta.ErrNodeNotEnough.Error()) } } diff --git a/internal/querycoordv2/meta/resource_manager.go b/internal/querycoordv2/meta/resource_manager.go index d69a3a567492698d8dd32e68c141f98f4db5b60e..f69d6557f094e6318d666cffa99eafbe4204164a 100644 --- a/internal/querycoordv2/meta/resource_manager.go +++ b/internal/querycoordv2/meta/resource_manager.go @@ -51,6 +51,7 @@ var ( ) var DefaultResourceGroupName = "__default_resource_group" +var DefaultResourceGroupCapacity = 1000000 type ResourceGroup struct { nodes UniqueSet @@ -67,54 +68,30 @@ func NewResourceGroup(capacity int) *ResourceGroup { } // assign node to resource group -func (rg *ResourceGroup) assignNode(id int64) error { +func (rg *ResourceGroup) assignNode(id int64, deltaCapacity int) error { if rg.containsNode(id) { return ErrNodeAlreadyAssign } rg.nodes.Insert(id) - rg.capacity++ + rg.capacity += deltaCapacity return nil } // unassign node from resource group -func (rg *ResourceGroup) unassignNode(id int64) error { +func (rg *ResourceGroup) unassignNode(id int64, deltaCapacity int) error { if !rg.containsNode(id) { // remove non exist node should be tolerable return nil } rg.nodes.Remove(id) - rg.capacity-- + rg.capacity += deltaCapacity return nil } -func (rg *ResourceGroup) handleNodeUp(id int64) error { - if rg.containsNode(id) { - return ErrNodeAlreadyAssign - } - - if len(rg.nodes) >= rg.capacity { - // handleNodeUp can grow the capacity - rg.capacity++ - } - - rg.nodes.Insert(id) - return nil -} - -func (rg *ResourceGroup) handleNodeDown(id int64) error { - if !rg.containsNode(id) { - // remove non exist node should be tolerable - return nil - } - - rg.nodes.Remove(id) - return nil -} - func (rg *ResourceGroup) LackOfNodes() int { return rg.capacity - len(rg.nodes) } @@ -141,7 +118,7 @@ type ResourceManager struct { func NewResourceManager(store Store, nodeMgr *session.NodeManager) *ResourceManager { groupMap := make(map[string]*ResourceGroup) - groupMap[DefaultResourceGroupName] = NewResourceGroup(0) + groupMap[DefaultResourceGroupName] = NewResourceGroup(DefaultResourceGroupCapacity) return &ResourceManager{ groups: groupMap, store: store, @@ -241,9 +218,14 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error { newNodes := rm.groups[rgName].GetNodes() newNodes = append(newNodes, node) + deltaCapacity := 1 + if rgName == DefaultResourceGroupName { + // default rg capacity won't be changed + deltaCapacity = 0 + } err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{ Name: rgName, - Capacity: int32(rm.groups[rgName].GetCapacity()) + 1, + Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity), Nodes: newNodes, }) if err != nil { @@ -255,7 +237,7 @@ func (rm *ResourceManager) assignNode(rgName string, node int64) error { return err } - err = rm.groups[rgName].assignNode(node) + err = rm.groups[rgName].assignNode(node, deltaCapacity) if err != nil { return err } @@ -290,7 +272,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error { return ErrRGNotExist } - if rm.nodeMgr.Get(node) == nil { + if rm.nodeMgr.Get(node) == nil || !rm.groups[rgName].containsNode(node) { // remove non exist node should be tolerable return nil } @@ -302,9 +284,15 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error { } } + deltaCapacity := -1 + if rgName == DefaultResourceGroupName { + // default rg capacity won't be changed + deltaCapacity = 0 + } + err := rm.store.SaveResourceGroup(&querypb.ResourceGroup{ Name: rgName, - Capacity: int32(rm.groups[rgName].GetCapacity()) - 1, + Capacity: int32(rm.groups[rgName].GetCapacity() + deltaCapacity), Nodes: newNodes, }) if err != nil { @@ -317,7 +305,7 @@ func (rm *ResourceManager) unassignNode(rgName string, node int64) error { } rm.checkRGNodeStatus(rgName) - err = rm.groups[rgName].unassignNode(node) + err = rm.groups[rgName].unassignNode(node, deltaCapacity) if err != nil { return err } @@ -461,7 +449,7 @@ func (rm *ResourceManager) HandleNodeUp(node int64) (string, error) { } // assign new node to default rg - rm.groups[DefaultResourceGroupName].handleNodeUp(node) + rm.groups[DefaultResourceGroupName].assignNode(node, 0) log.Info("HandleNodeUp: add node to default resource group", zap.String("rgName", DefaultResourceGroupName), zap.Int64("node", node), @@ -483,7 +471,7 @@ func (rm *ResourceManager) HandleNodeDown(node int64) (string, error) { zap.String("rgName", rgName), zap.Int64("node", node), ) - return rgName, rm.groups[rgName].handleNodeDown(node) + return rgName, rm.groups[rgName].unassignNode(node, 0) } return "", ErrNodeNotAssignToRG @@ -509,14 +497,23 @@ func (rm *ResourceManager) TransferNode(from string, to string, numNode int) err return err } + deltaFromCapacity := -1 + if from == DefaultResourceGroupName { + deltaFromCapacity = 0 + } + deltaToCapacity := 1 + if to == DefaultResourceGroupName { + deltaToCapacity = 0 + } + for _, node := range movedNodes { - err := rm.groups[from].unassignNode(node) + err := rm.groups[from].unassignNode(node, deltaFromCapacity) if err != nil { // interrupt transfer, unreachable logic path return err } - err = rm.groups[to].assignNode(node) + err = rm.groups[to].assignNode(node, deltaToCapacity) if err != nil { // interrupt transfer, unreachable logic path return err @@ -544,15 +541,27 @@ func (rm *ResourceManager) transferNodeInStore(from string, to string, numNode i } } + fromCapacity := rm.groups[from].GetCapacity() + if from != DefaultResourceGroupName { + // default rg capacity won't be changed + fromCapacity = rm.groups[from].GetCapacity() - numNode + } + fromRG := &querypb.ResourceGroup{ Name: from, - Capacity: int32(rm.groups[from].GetCapacity() - numNode), + Capacity: int32(fromCapacity), Nodes: fromNodeList, } + toCapacity := rm.groups[to].GetCapacity() + if from != DefaultResourceGroupName { + // default rg capacity won't be changed + toCapacity = rm.groups[to].GetCapacity() + numNode + } + toRG := &querypb.ResourceGroup{ Name: to, - Capacity: int32(rm.groups[to].GetCapacity() + numNode), + Capacity: int32(toCapacity), Nodes: toNodeList, } @@ -580,7 +589,7 @@ func (rm *ResourceManager) AutoRecoverResourceGroup(rgName string) (int, error) return i + 1, err } - err = rm.groups[rgName].handleNodeUp(node) + err = rm.groups[rgName].assignNode(node, 0) if err != nil { // roll back, unreachable logic path rm.assignNode(DefaultResourceGroupName, node) @@ -599,10 +608,18 @@ func (rm *ResourceManager) Recover() error { } for _, rg := range rgs { - rm.groups[rg.GetName()] = NewResourceGroup(0) - for _, node := range rg.GetNodes() { - rm.groups[rg.GetName()].assignNode(node) + if rg.GetName() == DefaultResourceGroupName { + rm.groups[rg.GetName()] = NewResourceGroup(DefaultResourceGroupCapacity) + for _, node := range rg.GetNodes() { + rm.groups[rg.GetName()].assignNode(node, 0) + } + } else { + rm.groups[rg.GetName()] = NewResourceGroup(0) + for _, node := range rg.GetNodes() { + rm.groups[rg.GetName()].assignNode(node, 1) + } } + rm.checkRGNodeStatus(rg.GetName()) log.Info("Recover resource group", zap.String("rgName", rg.GetName()), @@ -623,7 +640,7 @@ func (rm *ResourceManager) checkRGNodeStatus(rgName string) { zap.Int64("nodeID", node), ) - rm.groups[rgName].handleNodeDown(node) + rm.groups[rgName].unassignNode(node, 0) } } } diff --git a/internal/querycoordv2/meta/resource_manager_test.go b/internal/querycoordv2/meta/resource_manager_test.go index 910e4f4062f3853ad7b349959de224faa6457673..152e12cdd71092ff3135101511393912c681c723 100644 --- a/internal/querycoordv2/meta/resource_manager_test.go +++ b/internal/querycoordv2/meta/resource_manager_test.go @@ -111,7 +111,6 @@ func (suite *ResourceManagerSuite) TestManipulateNode() { err = suite.manager.AssignNode("rg1", 1) suite.NoError(err) err = suite.manager.AssignNode("rg2", 1) - println(err.Error()) suite.ErrorIs(err, ErrNodeAlreadyAssign) // transfer node between rgs @@ -175,14 +174,14 @@ func (suite *ResourceManagerSuite) TestHandleNodeUp() { suite.NoError(err) defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName) suite.NoError(err) - suite.Equal(0, defaultRG.GetCapacity()) + suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity()) suite.manager.HandleNodeUp(101) rg, err = suite.manager.GetResourceGroup("rg1") suite.NoError(err) suite.Equal(rg.GetCapacity(), 3) suite.Equal(len(rg.GetNodes()), 2) suite.False(suite.manager.ContainsNode("rg1", 101)) - suite.Equal(1, defaultRG.GetCapacity()) + suite.Equal(DefaultResourceGroupCapacity, defaultRG.GetCapacity()) } func (suite *ResourceManagerSuite) TestRecover() { @@ -311,13 +310,13 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() { } defaultRG, err := suite.manager.GetResourceGroup(DefaultResourceGroupName) suite.NoError(err) - suite.Equal(defaultRG.GetCapacity(), 0) + suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity) suite.Len(defaultRG.GetNodes(), 0) suite.manager.HandleNodeUp(1) suite.manager.HandleNodeUp(2) suite.manager.HandleNodeUp(3) - suite.Equal(defaultRG.GetCapacity(), 3) + suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity) suite.Len(defaultRG.GetNodes(), 3) // shutdown node 1 and 2 @@ -326,18 +325,18 @@ func (suite *ResourceManagerSuite) TestDefaultResourceGroup() { defaultRG, err = suite.manager.GetResourceGroup(DefaultResourceGroupName) suite.NoError(err) - suite.Equal(defaultRG.GetCapacity(), 3) + suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity) suite.Len(defaultRG.GetNodes(), 1) suite.manager.HandleNodeUp(4) suite.manager.HandleNodeUp(5) - suite.Equal(defaultRG.GetCapacity(), 3) + suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity) suite.Len(defaultRG.GetNodes(), 3) suite.manager.HandleNodeUp(7) suite.manager.HandleNodeUp(8) suite.manager.HandleNodeUp(9) - suite.Equal(defaultRG.GetCapacity(), 6) + suite.Equal(defaultRG.GetCapacity(), DefaultResourceGroupCapacity) suite.Len(defaultRG.GetNodes(), 6) } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 29c54e74704b21253ff031c89b084cd6503e3c16..22267930dbf1008f1db9b1ff3afe024e840e4595 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -1060,12 +1060,19 @@ func (s *Server) TransferReplica(ctx context.Context, req *querypb.TransferRepli fmt.Sprintf("the target resource group[%s] doesn't exist", req.GetTargetResourceGroup()), meta.ErrRGNotExist), nil } + replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetTargetResourceGroup()) + if len(replicas) > 0 { + return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, + fmt.Sprintf("found [%d] replicas of same collection in target resource group[%s], dynamically increase replica num is unsupported", + len(replicas), req.GetSourceResourceGroup())), nil + } + // for now, we don't support to transfer replica of same collection to same resource group - replicas := s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup()) + replicas = s.meta.ReplicaManager.GetByCollectionAndRG(req.GetCollectionID(), req.GetSourceResourceGroup()) if len(replicas) < int(req.GetNumReplica()) { return utils.WrapStatus(commonpb.ErrorCode_IllegalArgument, - fmt.Sprintf("found [%d] replicas of collection[%d] in source resource group[%s]", - len(replicas), req.GetCollectionID(), req.GetSourceResourceGroup())), nil + fmt.Sprintf("only found [%d] replicas in source resource group[%s]", + len(replicas), req.GetSourceResourceGroup())), nil } err := s.transferReplica(req.GetTargetResourceGroup(), replicas[:req.GetNumReplica()]) diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index b7a53c1024c70cbe575b8a3ba97a4f153468885b..fca57b7fe0c5e41ede2fee53ad87b8127858f72e 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -561,7 +561,7 @@ func (suite *ServiceSuite) TestTransferReplica() { NumReplica: 2, }) suite.NoError(err) - suite.Contains(resp.Reason, "found [0] replicas of collection[1] in source resource group") + suite.Contains(resp.Reason, "only found [0] replicas in source resource group") resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ SourceResourceGroup: "rgg", @@ -591,6 +591,11 @@ func (suite *ServiceSuite) TestTransferReplica() { ID: 222, ResourceGroup: meta.DefaultResourceGroupName, }, typeutil.NewUniqueSet(2))) + suite.server.meta.Put(meta.NewReplica(&querypb.Replica{ + CollectionID: 1, + ID: 333, + ResourceGroup: meta.DefaultResourceGroupName, + }, typeutil.NewUniqueSet(3))) suite.server.nodeMgr.Add(session.NewNodeInfo(1001, "localhost")) suite.server.nodeMgr.Add(session.NewNodeInfo(1002, "localhost")) @@ -611,6 +616,14 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.NoError(err) suite.Equal(resp.ErrorCode, commonpb.ErrorCode_Success) suite.Len(suite.server.meta.GetByResourceGroup("rg3"), 2) + resp, err = suite.server.TransferReplica(ctx, &querypb.TransferReplicaRequest{ + SourceResourceGroup: meta.DefaultResourceGroupName, + TargetResourceGroup: "rg3", + CollectionID: 1, + NumReplica: 2, + }) + suite.NoError(err) + suite.Contains(resp.Reason, "dynamically increase replica num is unsupported") // server unhealthy server.status.Store(commonpb.StateCode_Abnormal) diff --git a/internal/querycoordv2/utils/meta.go b/internal/querycoordv2/utils/meta.go index bdf06cb0d42e71c5788994833128044129f73882..7445d2944e8a9688ca5b8bf57fbc73bf19e6f83a 100644 --- a/internal/querycoordv2/utils/meta.go +++ b/internal/querycoordv2/utils/meta.go @@ -33,7 +33,7 @@ var ( ErrGetNodesFromRG = errors.New("failed to get node from rg") ErrNoReplicaFound = errors.New("no replica found during assign nodes") ErrReplicasInconsistent = errors.New("all replicas should belong to same collection during assign nodes") - ErrUseWrongNumRG = errors.New("resource num can only be 0, 1 or same as replica number") + ErrUseWrongNumRG = errors.New("resource group num can only be 0, 1 or same as replica number") ) func GetReplicaNodesInfo(replicaMgr *meta.ReplicaManager, nodeMgr *session.NodeManager, replicaID int64) []*session.NodeInfo { diff --git a/tests/python_client/check/func_check.py b/tests/python_client/check/func_check.py index 008918559bd98adcca13b2ee02a69c1d046124e5..b7de2a40a9df4e13dacc562efe55babea9dee38a 100644 --- a/tests/python_client/check/func_check.py +++ b/tests/python_client/check/func_check.py @@ -215,11 +215,11 @@ class ResponseChecker: if check_items.get("num_available_node", None): assert rg.num_available_node == check_items["num_available_node"] if check_items.get("num_loaded_replica", None): - assert rg.num_loaded_replica == check_items["num_loaded_replica"] + assert dict(rg.num_loaded_replica).items() >= check_items["num_loaded_replica"].items() if check_items.get("num_outgoing_node", None): - assert rg.num_outgoing_node == check_items["num_outgoing_node"] + assert dict(rg.num_outgoing_node).items() >= check_items["num_outgoing_node"].items() if check_items.get("num_incoming_node", None): - assert rg.num_incoming_node == check_items["num_incoming_node"] + assert dict(rg.num_incoming_node).items() >= check_items["num_incoming_node"].items() return True @staticmethod diff --git a/tests/python_client/common/common_type.py b/tests/python_client/common/common_type.py index 6967e125dc3d3117461bd20fd5607326918851dc..19a28507a5773730542142dcb130310b94a3ca4b 100644 --- a/tests/python_client/common/common_type.py +++ b/tests/python_client/common/common_type.py @@ -38,6 +38,7 @@ another_float_vec_field_name = "float_vector1" default_binary_vec_field_name = "binary_vector" default_partition_name = "_default" default_resource_group_name = '__default_resource_group' +default_resource_group_capacity = 1000000 default_tag = "1970_01_01" row_count = "row_count" default_length = 65535 diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 99482cbc840fc7bb57d115c5e7bfdc3722235a20..258c3f9f44018226017613980c77db85849776a2 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -12,7 +12,7 @@ allure-pytest==2.7.0 pytest-print==0.2.1 pytest-level==0.1.1 pytest-xdist==2.5.0 -pymilvus==2.3.0.dev34 +pymilvus==2.3.0.dev35 pytest-rerunfailures==9.1.1 git+https://github.com/Projectplace/pytest-tags ndg-httpsclient diff --git a/tests/python_client/testcases/test_resourcegroup.py b/tests/python_client/testcases/test_resourcegroup.py index fed7500bc88c544fba4ce951b0c867b7d37260e8..5e3321838953ac936d53e50bdaf5c74f06d1de37 100644 --- a/tests/python_client/testcases/test_resourcegroup.py +++ b/tests/python_client/testcases/test_resourcegroup.py @@ -7,8 +7,6 @@ from common.common_type import CaseLabel, CheckTasks from utils.util_pymilvus import * from utils.util_log import test_log as log - -@pytest.mark.skip(reason="still debugging") class TestResourceGroupParams(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) def test_rg_default(self): @@ -28,10 +26,9 @@ class TestResourceGroupParams(TestcaseBase): self._connect() rgs, _ = self.utility_wrap.list_resource_groups() rgs_count = len(rgs) - default_rg_init_cap = 2 default_rg_init_available_node = 1 default_rg_info = {"name": ct.default_resource_group_name, - "capacity": default_rg_init_cap, + "capacity": ct.default_resource_group_capacity, "num_available_node": default_rg_init_available_node, "num_loaded_replica": {}, "num_outgoing_node": {}, @@ -73,7 +70,7 @@ class TestResourceGroupParams(TestcaseBase): check_task=ct.CheckTasks.check_rg_property, check_items=target_rg_info) source_rg_info = {"name": ct.default_resource_group_name, - "capacity": default_rg_init_cap - num_node, + "capacity": ct.default_resource_group_capacity, "num_available_node": default_rg_init_available_node - num_node, } self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, @@ -98,8 +95,7 @@ class TestResourceGroupParams(TestcaseBase): self.utility_wrap.drop_resource_group(name=m_rg_name) rgs, _ = self.utility_wrap.list_resource_groups() assert len(rgs) == rgs_count - # pytest.skip(reason='issue #21962') - error = {ct.err_code: 999, ct.err_msg: 'failed to describe resource group, err=rg is not existing'} + error = {ct.err_code: 999, ct.err_msg: "failed to describe resource group, err=resource group doesn't exist"} self.utility_wrap.describe_resource_group(name=m_rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @@ -113,7 +109,7 @@ class TestResourceGroupParams(TestcaseBase): """ self._connect() error = {ct.err_code: 999, - ct.err_msg: "failed to create resource group, err=resource group name couldn't be empty"} + ct.err_msg: "`resource_group_name` value {} is illegal".format(rg_name)} self.init_resource_group(name=rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) @@ -124,17 +120,29 @@ class TestResourceGroupParams(TestcaseBase): """ pass - @pytest.mark.skip(reason="need define rules of valid names") @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("rg_name", ct.get_invalid_strs) + @pytest.mark.parametrize("rg_name", [[], 1, [1, "2", 3], (1,), {1: 1}, None]) + def test_create_rg_illegal_names(self, rg_name): + """ + method: create a rg with an invalid name(what are invalid names? types, length, chinese,symbols) + verify: fail with error msg + """ + self._connect() + error = {ct.err_code: 999, + ct.err_msg: "`resource_group_name` value {} is illegal".format(rg_name)} + self.init_resource_group(rg_name, check_task=ct.CheckTasks.err_res, check_items=error) + + @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.parametrize("rg_name", [" ", "12-s", "12 s", "(mn)", "中文", "%$#", "qw$_o90", "1ns_", "a".join("a" for i in range(256))]) def test_create_rg_invalid_names(self, rg_name): """ method: create a rg with an invalid name(what are invalid names? types, length, chinese,symbols) verify: fail with error msg """ self._connect() - self.init_resource_group(name=rg_name) - # TODO: check error msg + error = {ct.err_code: 999, + ct.err_msg: "Invalid resource group name"} + self.init_resource_group(rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_create_rg_max_length_name(self): @@ -164,7 +172,6 @@ class TestResourceGroupParams(TestcaseBase): check_task=ct.CheckTasks.err_res, check_items=error) - @pytest.mark.skip(reason="issue #21971") @pytest.mark.tags(CaseLabel.L1) def test_create_rg_dropped_name(self): """ @@ -187,15 +194,13 @@ class TestResourceGroupParams(TestcaseBase): # drop the rg self.utility_wrap.drop_resource_group(name=rg_name) assert len(self.utility_wrap.list_resource_groups()[0]) == rgs_count - 1 - # error = {ct.err_code: 999, - # ct.err_msg: "failed to create resource group, err=resource group not exist"} - # self.utility_wrap.describe_resource_group(name=rg_name, - # check_task=ct.CheckTasks.err_res, - # check_items=error) - # create the rg with the same name again + error = {ct.err_code: 999, + ct.err_msg: "failed to describe resource group, err=resource group doesn't exist"} self.utility_wrap.describe_resource_group(name=rg_name, - check_task=ct.CheckTasks.check_rg_property, - check_items={"name": rg_name}) + check_task=ct.CheckTasks.err_res, + check_items=error) + # create the rg with the same name again + self.init_resource_group(name=rg_name) assert rgs_count == len(self.utility_wrap.list_resource_groups()[0]) self.utility_wrap.describe_resource_group(name=rg_name, check_task=ct.CheckTasks.check_rg_property, @@ -233,9 +238,9 @@ class TestResourceGroupParams(TestcaseBase): verify: drop successfully """ self._connect() - rgs_count = len(self.utility_wrap.list_collections()[0]) - self.utility_wrap.drop_resource_group(name=rg_name) - assert rgs_count == len(self.utility_wrap.list_collections()[0]) + error = {ct.err_code: 999, + ct.err_msg: "`resource_group_name` value {} is illegal".format(rg_name)} + self.utility_wrap.drop_resource_group(name=rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_drop_rg_invalid_names(self): @@ -301,7 +306,6 @@ class TestResourceGroupParams(TestcaseBase): check_task=CheckTasks.check_rg_property, check_items=default_rg_info) - # @pytest.mark.skip(reason="issue #21971") @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("rg_name", ["", None]) def test_describe_rg_empty_name(self, rg_name): @@ -310,8 +314,9 @@ class TestResourceGroupParams(TestcaseBase): verify: fail with error msg """ self._connect() - self.utility_wrap.describe_resource_group(name=rg_name) - # TODO: check error + error = {ct.err_code: 999, + ct.err_msg: "`resource_group_name` value {} is illegal".format(rg_name)} + self.utility_wrap.drop_resource_group(name=rg_name, check_task=ct.CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L1) def test_describe_rg_invalid_names(self): @@ -321,8 +326,7 @@ class TestResourceGroupParams(TestcaseBase): """ pass - # @pytest.mark.skip(reason="issue #21962") - # @pytest.mark.tags(CaseLabel.L1) + @pytest.mark.tags(CaseLabel.L1) def test_describe_rg_non_existing(self): """ method: describe an non existing rg @@ -330,7 +334,7 @@ class TestResourceGroupParams(TestcaseBase): """ self._connect() non_existing_rg = 'non_existing' - error = {ct.err_code: 999, ct.err_msg: 'failed to describe resource group, err=rg is not existing'} + error = {ct.err_code: 999, ct.err_msg: "failed to describe resource group, err=resource group doesn't exist"} self.utility_wrap.describe_resource_group(name=non_existing_rg, check_task=ct.CheckTasks.err_res, check_items=error) @@ -343,7 +347,7 @@ class TestResourceGroupParams(TestcaseBase): num_outgoing_node and num_incoming_node """ self._connect() - default_rg_init_cap = 2 + default_rg_init_cap = 1000000 default_rg_init_available_node = 1 default_rg_info = {"name": ct.default_resource_group_name, "capacity": default_rg_init_cap, @@ -356,8 +360,7 @@ class TestResourceGroupParams(TestcaseBase): check_task=ct.CheckTasks.check_rg_property, check_items=default_rg_info) - -@pytest.mark.skip(reason="still debugging") +@pytest.mark.skip(reason="will cause concurrent problems") class TestTransferNode(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) def test_transfer_node_default(self): @@ -425,6 +428,9 @@ class TestTransferNode(TestcaseBase): check_items=error ) self.utility_wrap.drop_resource_group(name=rg1_name) + # clean + self.utility_wrap.transfer_node(source=rg2_name, target=ct.default_resource_group_name, num_node=1) + self.utility_wrap.drop_resource_group(name=rg2_name) @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("with_growing", [True, False]) @@ -454,11 +460,11 @@ class TestTransferNode(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) default_rg_info = {"name": ct.default_resource_group_name, - "capacity": 2, + "capacity": ct.default_resource_group_capacity, "num_available_node": 1, "num_loaded_replica": {collection_w.name: 1}, "num_outgoing_node": {}, @@ -489,7 +495,7 @@ class TestTransferNode(TestcaseBase): check_task=CheckTasks.check_rg_property, check_items=rg_info) default_rg_info = {"name": ct.default_resource_group_name, - "capacity": 1, + "capacity": ct.default_resource_group_capacity, "num_available_node": 0, "num_loaded_replica": {collection_w.name: 1}, "num_outgoing_node": {collection_w.name: 1}, @@ -506,7 +512,7 @@ class TestTransferNode(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) @@ -523,7 +529,7 @@ class TestTransferNode(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # verify rg state @@ -538,7 +544,7 @@ class TestTransferNode(TestcaseBase): check_task=CheckTasks.check_rg_property, check_items=rg_info) default_rg_info = {"name": ct.default_resource_group_name, - "capacity": 2, + "capacity": ct.default_resource_group_capacity, "num_available_node": 1, "num_loaded_replica": {collection_w.name: 1}, "num_outgoing_node": {}, @@ -547,8 +553,12 @@ class TestTransferNode(TestcaseBase): self.utility_wrap.describe_resource_group(name=ct.default_resource_group_name, check_task=CheckTasks.check_rg_property, check_items=default_rg_info) + # clean + collection_w.release() + collection_w.drop() + self.utility_wrap.drop_resource_group(name=rg_name) + - @pytest.mark.xfail(reason="issue #22051") @pytest.mark.tags(CaseLabel.L0) def test_load_collection_with_no_available_node(self): """ @@ -607,7 +617,7 @@ class TestTransferNode(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # check rgA info @@ -633,7 +643,7 @@ class TestTransferNode(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # check rgA info after transfer @@ -648,7 +658,7 @@ class TestTransferNode(TestcaseBase): check_task=CheckTasks.check_rg_property, check_items=rg_info) default_rg_info = {"name": ct.default_resource_group_name, - "capacity": 1, + "capacity": ct.default_resource_group_capacity, "num_available_node": 1, "num_loaded_replica": {}, "num_outgoing_node": {}, @@ -659,13 +669,12 @@ class TestTransferNode(TestcaseBase): check_items=default_rg_info) # 8. load the collection with default rg - error = {ct.err_code: 999, - ct.err_msg: 'failed to load, err=already loaded in the other rg'} - collection_w.load(check_task=CheckTasks.err_res, check_items=error) + # error = {ct.err_code: 5, + # ct.err_msg: 'failed to load, err=already loaded in the other rg'} + # collection_w.load(_resource_groups=[rg_name], check_task=CheckTasks.err_res, check_items=error) - @pytest.mark.xfail(reason="issue #22058") @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("replicas", [1, 2, 3]) + @pytest.mark.parametrize("replicas", [1, 3]) def test_load_collection_with_multi_replicas_multi_rgs(self, replicas): """ Method: @@ -688,20 +697,17 @@ class TestTransferNode(TestcaseBase): # load with different replicas error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=no enough nodes to create replicas[NoEnoughNode]'} + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} collection_w.load(replica_number=replicas, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) - # error = {ct.err_code: 999, - # ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} collection_w.load(replica_number=replicas, _resource_groups=[ct.default_resource_group_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) - @pytest.mark.xfail("reason=issue #22058") @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("rg_names", [[], [""], ["non_existing"], "不合法"]) + @pytest.mark.parametrize("rg_names", [[""], ["non_existing"], "不合法"]) def test_load_collection_with_empty_rg_name(self, rg_names): """ Method: @@ -710,12 +716,11 @@ class TestTransferNode(TestcaseBase): """ collection_w = self.init_collection_wrap() collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) - error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + error = {ct.err_code: 1, + ct.err_msg: "failed to load collection, err=failed to spawn replica for collection[resource group doesn't exist]"} collection_w.load(_resource_groups=rg_names, check_task=CheckTasks.err_res, check_items=error) - @pytest.mark.xfail(reason="issue #22051") @pytest.mark.tags(CaseLabel.L0) def test_load_partition_with_no_available_node(self): """ @@ -807,7 +812,7 @@ class TestTransferNode(TestcaseBase): check_task=CheckTasks.check_rg_property, check_items=rg_info) default_rg_info = {"name": ct.default_resource_group_name, - "capacity": 1, + "capacity": ct.default_resource_group_capacity, "num_available_node": 1, "num_loaded_replica": {}, "num_outgoing_node": {}, @@ -818,12 +823,12 @@ class TestTransferNode(TestcaseBase): check_items=default_rg_info) # 8. load the collection with default rg - error = {ct.err_code: 999, - ct.err_msg: 'failed to load, err=already loaded in the other rg'} - partition_w.load(check_task=CheckTasks.err_res, check_items=error) + # error = {ct.err_code: 999, + # ct.err_msg: 'failed to load, err=already loaded in the other rg'} + # partition_w.load(check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("replicas", [1, 2, 3]) + @pytest.mark.parametrize("replicas", [1, 3]) def test_load_partition_with_multi_replicas_multi_rgs(self, replicas): """ Method: @@ -849,19 +854,17 @@ class TestTransferNode(TestcaseBase): # load with different replicas error = {ct.err_code: 999, - ct.err_msg: 'failed to load partitions, err=no enough nodes to create replicas[NoEnoughNode]'} + ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} partition_w.load(replica_number=replicas, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) - error = {ct.err_code: 999, - ct.err_msg: 'failed to load partitions, err=no enough nodes to create replicas[NoEnoughNode]'} partition_w.load(replica_number=replicas, _resource_groups=[ct.default_resource_group_name, rgB_name], check_task=CheckTasks.err_res, check_items=error) @pytest.mark.tags(CaseLabel.L0) - @pytest.mark.parametrize("rg_names", [[], [""], ["non_existing"], "不合法"]) + @pytest.mark.parametrize("rg_names", [[""], ["non_existing"], "不合法"]) def test_load_partition_with_empty_rg_name(self, rg_names): """ Method: @@ -872,16 +875,17 @@ class TestTransferNode(TestcaseBase): dim = ct.default_dim # 1. create a partition collection_w = self.init_collection_wrap() + collection_w.create_index(ct.default_float_vec_field_name, ct.default_flat_index) partition_name = cf.gen_unique_str('par') partition_w = self.init_partition_wrap(collection_w, partition_name) - error = {ct.err_code: 999, - ct.err_msg: 'failed to load partition, err=failed to spawn replica for collection[nodes not enough]'} + error = {ct.err_code: 1, + ct.err_msg: 'failed to load partitions, err=failed to spawn replica for collection[resource num can only be 0, 1 or same as replica number]'} partition_w.load(_resource_groups=rg_names, check_task=CheckTasks.err_res, check_items=error) -@pytest.mark.skip(reason="still debugging") +@pytest.mark.skip(reason="need 8 query nodes for this test") # run the multi node tests with 8 query nodes class TestResourceGroupMultiNodes(TestcaseBase): @pytest.mark.tags(CaseLabel.L0) @@ -913,7 +917,8 @@ class TestResourceGroupMultiNodes(TestcaseBase): # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=500, dim=dim, start=6*nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=6 * nb)) + insert_ids.extend(res.primary_keys) # create rgA rg_name = cf.gen_unique_str('rg') @@ -960,16 +965,17 @@ class TestResourceGroupMultiNodes(TestcaseBase): # verify replica state replicas = collection_w.get_replicas() num_nodes_for_replicas = 0 - assert len(replicas) == replica_number - for rep in replicas: - assert rep.resource_group_name == rg_name + assert len(replicas[0].groups) == replica_number + for rep in replicas[0].groups: + assert rep.resource_group == rg_name assert rep.num_outbound_node == {} - num_nodes_for_replicas += len(rep.node_ids) + num_nodes_for_replicas += len(rep.group_nodes) assert num_nodes_for_replicas == num_nodes_to_rg # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=7 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=7 * nb)) + insert_ids.extend(res.primary_keys) # verify search succ nq = 5 @@ -980,7 +986,7 @@ class TestResourceGroupMultiNodes(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) @@ -1015,15 +1021,16 @@ class TestResourceGroupMultiNodes(TestcaseBase): # verify replica state replicas = collection_w.get_replicas() - assert len(replicas) == replica_number - for rep in replicas: - assert rep.resource_group_name == rg_name + assert len(replicas[0].groups) == replica_number + for rep in replicas[0].groups: + assert rep.resource_group == rg_name assert rep.num_outbound_node == {} - assert len(rep.node_ids) == 1 # one replica for each node + assert len(rep.group_nodes) == 1 # one replica for each node # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=8 * nb)) + insert_ids.extend(res.primary_keys) # verify search succ collection_w.search(vectors[:nq], @@ -1032,20 +1039,25 @@ class TestResourceGroupMultiNodes(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # verify load successfully again with no parameters - collection_w.load() + collection_w.load(replica_number=replica_number) collection_w.search(vectors[:nq], ct.default_float_vec_field_name, ct.default_search_params, ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) + collection_w.release() + collection_w.drop() + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=num_nodes_to_rg) @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("with_growing", [True, False]) @@ -1076,7 +1088,8 @@ class TestResourceGroupMultiNodes(TestcaseBase): # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=500, dim=dim, start=6 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=6 * nb)) + insert_ids.extend(res.primary_keys) # create rgA and rgB rgA_name = cf.gen_unique_str('rgA') @@ -1090,12 +1103,12 @@ class TestResourceGroupMultiNodes(TestcaseBase): num_node=num_nodes_rgA) num_nodes_rgB = 3 self.utility_wrap.transfer_node(source=ct.default_resource_group_name, - target=rgA_name, + target=rgB_name, num_node=num_nodes_rgB) # load 3 replicas in rgA and rgB replica_number = 3 error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} collection_w.load(replica_number=replica_number, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, @@ -1104,7 +1117,7 @@ class TestResourceGroupMultiNodes(TestcaseBase): # load 1 replica in rgA and rgB replica_number = 1 error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[resource group num can only be 0, 1 or same as replica number]'} collection_w.load(replica_number=replica_number, _resource_groups=[rgA_name, rgB_name], check_task=CheckTasks.err_res, @@ -1116,18 +1129,19 @@ class TestResourceGroupMultiNodes(TestcaseBase): _resource_groups=[rgA_name, rgB_name]) # verify replica state: each replica occupies one rg replicas = collection_w.get_replicas() - assert len(replicas) == replica_number - for rep in replicas: + assert len(replicas[0].groups) == replica_number + for rep in replicas[0].groups: assert rep.num_outbound_node == {} - assert rep.resource_group_name in [rgA_name, rgB_name] - if rep.resource_group_name == rgA_name: - assert len(rep.node_ids) == num_nodes_rgA # one replica for each rg + assert rep.resource_group in [rgA_name, rgB_name] + if rep.resource_group == rgA_name: + assert len(rep.group_nodes) == num_nodes_rgA # one replica for each rg else: - assert len(rep.node_ids) == num_nodes_rgB # one replica for each rg + assert len(rep.group_nodes) == num_nodes_rgB # one replica for each rg # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=8 * nb)) + insert_ids.extend(res.primary_keys) # verify search succ nq = 5 @@ -1138,20 +1152,28 @@ class TestResourceGroupMultiNodes(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # verify load successfully again with no parameters - collection_w.load() + collection_w.load(replica_number=2) collection_w.search(vectors[:nq], ct.default_float_vec_field_name, ct.default_search_params, ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) + collection_w.release() + collection_w.drop() + self.utility_wrap.transfer_node(source=rgA_name, + target=ct.default_resource_group_name, + num_node=num_nodes_rgA) + self.utility_wrap.transfer_node(source=rgB_name, + target=ct.default_resource_group_name, + num_node=num_nodes_rgB) @pytest.mark.tags(CaseLabel.L0) @pytest.mark.parametrize("with_growing", [True, False]) @@ -1173,12 +1195,12 @@ class TestResourceGroupMultiNodes(TestcaseBase): rgB_name = cf.gen_unique_str('rgB') self.init_resource_group(name=rgB_name) - # transfer 2 nodes to rgA, 4 nodes to rgB + # transfer 1 nodes to rgA, 2 nodes to rgB self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=rgA_name, num_node=1) self.utility_wrap.transfer_node(source=ct.default_resource_group_name, - target=rgA_name, + target=rgB_name, num_node=2) dim = 128 @@ -1195,7 +1217,8 @@ class TestResourceGroupMultiNodes(TestcaseBase): # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=6 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=6 * nb)) + insert_ids.extend(res.primary_keys) nq = 5 vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] @@ -1206,23 +1229,28 @@ class TestResourceGroupMultiNodes(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) # transfer 1 replica from rgB to rgA error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + ct.err_msg: 'dynamically increase replica num is unsupported'} self.utility_wrap.transfer_replica(source=rgB_name, target=rgA_name, collection_name=collection_w.name, num_replica=1, check_task=CheckTasks.err_res, check_items=error) + error = {ct.err_code: 999, + ct.err_msg: 'dynamically increase replica num is unsupported'} # transfer 1 replica from rgA to rgB self.utility_wrap.transfer_replica(source=rgA_name, target=rgB_name, - collection_name=collection_w.name, num_replica=1) + collection_name=collection_w.name, num_replica=1, + check_task=CheckTasks.err_res, + check_items=error) # make growing if with_growing: - collection_w.insert(cf.gen_default_list_data(nb=200, dim=dim, start=7 * nb)) + res, _ = collection_w.insert(cf.gen_default_list_data(nb, dim=dim, start=7 * nb)) + insert_ids.extend(res.primary_keys) # verify search succ nq = 5 @@ -1233,9 +1261,15 @@ class TestResourceGroupMultiNodes(TestcaseBase): ct.default_limit, check_task=CheckTasks.check_search_results, check_items={"nq": nq, - "ids": insert_ids, + "ids": insert_ids.copy(), "limit": ct.default_limit} ) + self.utility_wrap.transfer_node(source=rgA_name, + target=ct.default_resource_group_name, + num_node=1) + self.utility_wrap.transfer_node(source=rgB_name, + target=ct.default_resource_group_name, + num_node=2) @pytest.mark.tags(CaseLabel.L0) def test_transfer_replica_not_enough_replicas_to_transfer(self): @@ -1261,12 +1295,17 @@ class TestResourceGroupMultiNodes(TestcaseBase): num_node=3) # transfer 2 replicas to rgA - error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + error = {ct.err_code: 5, + ct.err_msg: 'only found [1] replicas in source resource group[__default_resource_group]'} self.utility_wrap.transfer_replica(source=ct.default_resource_group_name, target=rg_name, collection_name=collection_w.name, num_replica=2, check_task=CheckTasks.err_res, check_items=error) + collection_w.release() + collection_w.drop() + self.utility_wrap.transfer_node(source=rg_name, + target=ct.default_resource_group_name, + num_node=3) @pytest.mark.tags(CaseLabel.L0) def test_transfer_replica_non_existing_rg(self): @@ -1283,18 +1322,20 @@ class TestResourceGroupMultiNodes(TestcaseBase): # transfer replica to a non_existing rg rg_name = "non_existing" error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + ct.err_msg: "the target resource group[non_existing] doesn't exist, err=resource group doesn't exist"} self.utility_wrap.transfer_replica(source=ct.default_resource_group_name, target=rg_name, + collection_name=collection_w.name, num_replica=1, check_task=CheckTasks.err_res, check_items=error) # transfer replica from a non_existing rg error = {ct.err_code: 999, - ct.err_msg: 'failed to load collection, err=failed to spawn replica for collection[nodes not enough]'} + ct.err_msg: "the source resource group[non_existing] doesn't exist, err=resource group doesn't exist"} self.utility_wrap.transfer_replica(source=rg_name, target=ct.default_resource_group_name, + collection_name=collection_w.name, num_replica=1, check_task=CheckTasks.err_res, check_items=error)