diff --git a/tests/python_client/base/client_base.py b/tests/python_client/base/client_base.py index c01322bf61f4ac977cd62e22e619c11ce1fa7d33..5be54d2239acc9430f666327b0b1aa3c3d537ffc 100644 --- a/tests/python_client/base/client_base.py +++ b/tests/python_client/base/client_base.py @@ -1,3 +1,4 @@ +from numpy.core.fromnumeric import _partition_dispatcher import pytest import sys from pymilvus import DefaultConfig @@ -133,6 +134,49 @@ class TestcaseBase(Base): **kwargs) return partition_wrap + def init_collection_without_load(self, prefix, insert_data=False, nb=ct.default_nb, + partition_num=0, is_binary=False, is_all_data_type=False, + auto_id=False, dim=ct.default_dim, is_index=False): + """ + target: create specified collections + method: 1. create collections (binary/non-binary, default/all data type, auto_id or not) + 2. create partitions if specified + 3. insert specified (binary/non-binary, default/all data type) data + into each partition if any + expected: return collection and raw data, insert ids + """ + log.info("Test case of search interface: initialize before test case") + self._connect() + collection_name = cf.gen_unique_str(prefix) + vectors = [] + binary_raw_vectors = [] + insert_ids = [] + time_stamp = 0 + # 1 create collection + default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim) + if is_binary: + default_schema = cf.gen_default_binary_collection_schema(auto_id=auto_id, dim=dim) + if is_all_data_type: + default_schema = cf.gen_collection_schema_all_datatype(auto_id=auto_id, dim=dim) + log.info("init_collection_general: collection creation") + collection_w = self.init_collection_wrap(name=collection_name, + schema=default_schema) + # 2 add extra partitions if specified (default is 1 partition named "_default") + if partition_num > 0: + cf.gen_partitions(collection_w, partition_num) + + # 3 insert data if specified + if insert_data: + collection_w, vectors, binary_raw_vectors, insert_ids, time_stamp = \ + cf.insert_data(collection_w, nb, is_binary, is_all_data_type, auto_id=auto_id, dim=dim) + assert collection_w.is_empty is False + assert collection_w.num_entities == nb + log.info("insert_data: inserted data into collection %s (num_entities: %s)" + % (collection_w.name, nb)) + + return collection_w, vectors, binary_raw_vectors, insert_ids, time_stamp + + def init_collection_general(self, prefix, insert_data=False, nb=ct.default_nb, partition_num=0, is_binary=False, is_all_data_type=False, auto_id=False, dim=ct.default_dim, is_index=False): @@ -194,7 +238,30 @@ class TestcaseBase(Base): collection_w.insert(df_default) conn.flush([collection_w.name]) collection_w.load() - return collection_w, partition_w, df_partition, df_default + return collection_w, partition_w,df_partition, df_default + + + def insert_entities_into_two_partitions(self, half, prefix='query'): + """ + insert default entities into two partitions(partition_w and _default) in half(int64 and float fields values) + :param half: half of nb + :return: collection wrap and partition wrap + """ + conn = self._connect() + collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix)) + partition_w = self.init_partition_wrap(collection_wrap=collection_w) + partition_e = self.init_partition_wrap(collection_wrap=collection_w) + # insert [0, half) into partition_w + df_partition = cf.gen_default_dataframe_data(nb=half, start=0) + partition_w.insert(df_partition) + # insert [half, nb) into _default + df_default = cf.gen_default_dataframe_data(nb=half, start=half) + partition_e.insert(df_default) + conn.flush([collection_w.name]) + partition_w.load() + # partition_e.load() + return collection_w, partition_w, partition_e,df_default + def collection_insert_multi_segments_one_shard(self, collection_prefix, num_of_segment=2, nb_of_segment=1, is_dup=True): diff --git a/tests/python_client/testcases/test_partition_20.py b/tests/python_client/testcases/test_partition_20.py index 73f10c9842560c6b27dc1e2da6a2cbcb0b69a248..64800182d49441a642d36195176db437d906ff5c 100644 --- a/tests/python_client/testcases/test_partition_20.py +++ b/tests/python_client/testcases/test_partition_20.py @@ -1,3 +1,4 @@ +from os import name import threading import pytest @@ -233,21 +234,76 @@ class TestPartitionParams(TestcaseBase): # check that the partition not exists assert not collection_w.has_partition(partition_name)[0] + + @pytest.mark.tags(CaseLabel.L2) + def test_load_partiton_respectively(self): + """ + target: test release the partiton after load partiton + method: load partiton1 and load other partiton + expected: raise exception + """ + self._connect() + collection_w = self.init_collection_wrap() + partition_w1 = self.init_partition_wrap(collection_w) + partition_w2 = self.init_partition_wrap(collection_w) + partition_w1.insert(cf.gen_default_list_data()) + partition_w2.insert(cf.gen_default_list_data()) + partition_w1.load() + error = {ct.err_code: 1, ct.err_msg: f'load the partition after load collection is not supported'} + partition_w2.load(check_task=CheckTasks.err_res, + check_items=error) + + @pytest.mark.tags(CaseLabel.L2) + def test_load_partitons_after_release(self): + """ + target: test release the partiton after load partition + method: load partitons and release partitions + expected: no exception + """ + self._connect() + collection_w = self.init_collection_wrap() + partition_w1 = self.init_partition_wrap(collection_w,name="partition_w1") + partition_w2 = self.init_partition_wrap(collection_w,name="partition_w2") + partition_w1.insert(cf.gen_default_list_data()) + partition_w2.insert(cf.gen_default_list_data()) + partition_names=["partition_w1","partition_w2"] + collection_w.load(partition_names) + collection_w.release(partition_names) + + + @pytest.mark.tags(CaseLabel.L2) + def test_load_partiton_after_load_partition(self): + """ + target: test release the partiton after load partition + method: load partition1 and release the partition1 + load partition2 + expected: no exception + """ + self._connect() + collection_w = self.init_collection_wrap() + partition_w1 = self.init_partition_wrap(collection_w) + partition_w2 = self.init_partition_wrap(collection_w) + partition_w1.insert(cf.gen_default_list_data()) + partition_w2.insert(cf.gen_default_list_data()) + partition_w1.load() + partition_w1.release() + partition_w2.load() + @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") def test_partition_release(self): """ target: verify release partition method: 1. create a collection and two partitions 2. insert data into each partition - 3. flush and load the both partitions + 3. flush and load the partition1 4. release partition1 - 5. release partition1 twice + 5. release partition2 expected: 1. the 1st partition is released - 2. the 2nd partition is not released + 2. the 2nd partition is released """ # create collection + collection_w = self.init_collection_wrap() # create two partitions @@ -260,20 +316,17 @@ class TestPartitionParams(TestcaseBase): # load two partitions partition_w1.load() - partition_w2.load() - - # search two partitions + + # search partition1 search_vectors = cf.gen_vectors(1, ct.default_dim) res1, _ = partition_w1.search(data=search_vectors, anns_field=ct.default_float_vec_field_name, params={"nprobe": 32}, limit=1) - res2, _ = partition_w2.search(data=search_vectors, - anns_field=ct.default_float_vec_field_name, - params={"nprobe": 32}, limit=1) - assert len(res1) == 1 and len(res2) == 1 + assert len(res1) == 1 # release the first partition partition_w1.release() + partition_w2.release() # check result res1, _ = partition_w1.search(data=search_vectors, @@ -281,10 +334,6 @@ class TestPartitionParams(TestcaseBase): params={"nprobe": 32}, limit=1, check_task=ct.CheckTasks.err_res, check_items={ct.err_code: 1, ct.err_msg: "partitions have been released"}) - res2, _ = partition_w2.search(data=search_vectors, - anns_field=ct.default_float_vec_field_name, - params={"nprobe": 32}, limit=1) - assert len(res2) == 1 @pytest.mark.tags(CaseLabel.L1) @pytest.mark.parametrize("data", [cf.gen_default_dataframe_data(10), diff --git a/tests/python_client/testcases/test_search_20.py b/tests/python_client/testcases/test_search_20.py index 418d7dc8d6f354378b3f31d1db509a7ca5236b5a..58afac61db3d18a7cc28333c6f83042c8eb17cc5 100644 --- a/tests/python_client/testcases/test_search_20.py +++ b/tests/python_client/testcases/test_search_20.py @@ -476,7 +476,6 @@ class TestCollectionSearchInvalid(TestcaseBase): "into memory" % collection_w.name}) @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") def test_search_release_partition(self): """ target: test the scenario which search the released collection @@ -487,7 +486,7 @@ class TestCollectionSearchInvalid(TestcaseBase): """ # 1. initialize with data partition_num = 1 - collection_w = self.init_collection_general(prefix, True, 10, partition_num)[0] + collection_w = self.init_collection_without_load(prefix, True, 10, partition_num)[0] par = collection_w.partitions par_name = par[partition_num].name # 2. release partition @@ -545,7 +544,6 @@ class TestCollectionSearchInvalid(TestcaseBase): "limit": default_limit}) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") def test_search_partition_deleted(self): """ target: test search deleted partition @@ -975,7 +973,7 @@ class TestCollectionSearch(TestcaseBase): "_async": _async}) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") + @pytest.mark.xfail(reason="issue #13611") def test_search_before_after_delete(self, nq, dim, auto_id, _async): """ target: test search function before and after deletion @@ -1007,6 +1005,7 @@ class TestCollectionSearch(TestcaseBase): log.info("test_search_before_after_delete: deleting a partition") par = collection_w.partitions deleted_entity_num = par[partition_num].num_entities + print(deleted_entity_num) entity_num = nb - deleted_entity_num collection_w.drop_partition(par[partition_num].name) log.info("test_search_before_after_delete: deleted a partition") @@ -1022,96 +1021,96 @@ class TestCollectionSearch(TestcaseBase): "limit": limit - deleted_entity_num, "_async": _async}) - @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") - def test_search_partition_after_release_one(self, nq, dim, auto_id, _async): - """ - target: test search function before and after release - method: 1. search the collection - 2. release a partition - 3. search the collection - expected: the deleted entities should not be searched - """ - # 1. initialize with data - nb = 1000 - limit = 1000 - partition_num = 1 - collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb, - partition_num, - auto_id=auto_id, - dim=dim)[0:4] - # 2. search all the partitions before partition deletion - vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - log.info("test_search_partition_after_release_one: searching before deleting partitions") - collection_w.search(vectors[:nq], default_search_field, - default_search_params, limit, - default_search_exp, _async=_async, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": limit, - "_async": _async}) - # 3. release one partition - log.info("test_search_partition_after_release_one: releasing a partition") - par = collection_w.partitions - deleted_entity_num = par[partition_num].num_entities - entity_num = nb - deleted_entity_num - conn = self.connection_wrap.get_connection()[0] - conn.release_partitions(collection_w.name, [par[partition_num].name]) - log.info("test_search_partition_after_release_one: released a partition") - # 4. search collection after release one partition - log.info("test_search_partition_after_release_one: searching after deleting partitions") - collection_w.search(vectors[:nq], default_search_field, - default_search_params, limit, - default_search_exp, _async=_async, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids[:entity_num], - "limit": limit - deleted_entity_num, - "_async": _async}) - - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") - def test_search_partition_after_release_all(self, nq, dim, auto_id, _async): - """ - target: test search function before and after release - method: 1. search the collection - 2. release all partitions - 3. search the collection - expected: 0 entity should be searched - """ - # 1. initialize with data - nb = 1000 - limit = 1000 - collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb, - 1, auto_id=auto_id, - dim=dim)[0:4] - # 2. search all the partitions before partition deletion - vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - log.info("test_search_partition_after_release_all: searching before deleting partitions") - collection_w.search(vectors[:nq], default_search_field, - default_search_params, limit, - default_search_exp, _async=_async, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": limit, - "_async": _async}) - # 3. release all partitions - log.info("test_search_partition_after_release_all: releasing a partition") - par = collection_w.partitions - conn = self.connection_wrap.get_connection()[0] - conn.release_partitions(collection_w.name, [par[0].name, par[1].name]) - log.info("test_search_partition_after_release_all: released a partition") - # 4. search collection after release all partitions - collection_w.search(vectors[:nq], default_search_field, - default_search_params, limit, - default_search_exp, _async=_async, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": [], - "limit": 0, - "_async": _async}) + # @pytest.mark.tags(CaseLabel.L1) + # @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") + # def test_search_partition_after_release_one(self, nq, dim, auto_id, _async): + # """ + # target: test search function before and after release + # method: 1. search the collection + # 2. release a partition + # 3. search the collection + # expected: the deleted entities should not be searched + # """ + # # 1. initialize with data + # nb = 1000 + # limit = 1000 + # partition_num = 1 + # collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb, + # partition_num, + # auto_id=auto_id, + # dim=dim)[0:4] + # # 2. search all the partitions before partition deletion + # vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # log.info("test_search_partition_after_release_one: searching before deleting partitions") + # collection_w.search(vectors[:nq], default_search_field, + # default_search_params, limit, + # default_search_exp, _async=_async, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": insert_ids, + # "limit": limit, + # "_async": _async}) + # # 3. release one partition + # log.info("test_search_partition_after_release_one: releasing a partition") + # par = collection_w.partitions + # deleted_entity_num = par[partition_num].num_entities + # entity_num = nb - deleted_entity_num + # conn = self.connection_wrap.get_connection()[0] + # conn.release_partitions(collection_w.name, [par[partition_num].name]) + # log.info("test_search_partition_after_release_one: released a partition") + # # 4. search collection after release one partition + # log.info("test_search_partition_after_release_one: searching after deleting partitions") + # collection_w.search(vectors[:nq], default_search_field, + # default_search_params, limit, + # default_search_exp, _async=_async, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": insert_ids[:entity_num], + # "limit": limit - deleted_entity_num, + # "_async": _async}) + + # @pytest.mark.tags(CaseLabel.L2) + # @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") + # def test_search_partition_after_release_all(self, nq, dim, auto_id, _async): + # """ + # target: test search function before and after release + # method: 1. search the collection + # 2. release all partitions + # 3. search the collection + # expected: 0 entity should be searched + # """ + # # 1. initialize with data + # nb = 1000 + # limit = 1000 + # collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb, + # 1, auto_id=auto_id, + # dim=dim)[0:4] + # # 2. search all the partitions before partition deletion + # vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # log.info("test_search_partition_after_release_all: searching before deleting partitions") + # collection_w.search(vectors[:nq], default_search_field, + # default_search_params, limit, + # default_search_exp, _async=_async, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": insert_ids, + # "limit": limit, + # "_async": _async}) + # # 3. release all partitions + # log.info("test_search_partition_after_release_all: releasing a partition") + # par = collection_w.partitions + # conn = self.connection_wrap.get_connection()[0] + # conn.release_partitions(collection_w.name, [par[0].name, par[1].name]) + # log.info("test_search_partition_after_release_all: released a partition") + # # 4. search collection after release all partitions + # collection_w.search(vectors[:nq], default_search_field, + # default_search_params, limit, + # default_search_exp, _async=_async, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": [], + # "limit": 0, + # "_async": _async}) @pytest.mark.tags(CaseLabel.L1) def test_search_collection_after_release_load(self, nb, nq, dim, auto_id, _async): @@ -1145,55 +1144,55 @@ class TestCollectionSearch(TestcaseBase): "limit": default_limit, "_async": _async}) - @pytest.mark.tags(CaseLabel.L2) - @pytest.mark.xfail(reason="issue 6997") - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") - def test_search_partition_after_release_load(self, nb, nq, dim, auto_id, _async): - """ - target: search the pre-released collection after load - method: 1. create collection - 2. release a partition - 3. load partition - 4. search the pre-released partition - expected: search successfully - """ - # 1. initialize without data - collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True, nb, - 1, auto_id=auto_id, - dim=dim)[0:5] - # 2. release collection - log.info("test_search_partition_after_release_load: releasing a partition") - par = collection_w.partitions - conn = self.connection_wrap.get_connection()[0] - conn.release_partitions(collection_w.name, [par[1].name]) - log.info("test_search_partition_after_release_load: released a partition") - # 3. Search the collection after load - limit = 1000 - collection_w.load() - log.info("test_search_partition_after_release_load: searching after load") - vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - collection_w.search(vectors[:nq], default_search_field, default_search_params, - limit, default_search_exp, _async=_async, - travel_timestamp=time_stamp, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids, - "limit": limit, - "_async": _async}) - # 4. Search the pre-released partition after load - if limit > par[1].num_entities: - limit_check = par[1].num_entities - else: - limit_check = limit - collection_w.search(vectors[:nq], default_search_field, default_search_params, - limit, default_search_exp, - [par[1].name], _async=_async, - travel_timestamp=time_stamp, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, - "ids": insert_ids[par[0].num_entities:], - "limit": limit_check, - "_async": _async}) + # @pytest.mark.tags(CaseLabel.L2) + # @pytest.mark.xfail(reason="issue 6997") + # @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") + # def test_search_partition_after_release_load(self, nb, nq, dim, auto_id, _async): + # """ + # target: search the pre-released collection after load + # method: 1. create collection + # 2. release a partition + # 3. load partition + # 4. search the pre-released partition + # expected: search successfully + # """ + # # 1. initialize without data + # collection_w, _, _, insert_ids, time_stamp = self.init_collection_without_load(prefix, True, nb, + # 1, auto_id=auto_id, + # dim=dim)[0:5] + # # 2. release collection + # log.info("test_search_partition_after_release_load: releasing a partition") + # par = collection_w.partitions + # conn = self.connection_wrap.get_connection()[0] + # conn.release_partitions(collection_w.name, [par[1].name]) + # log.info("test_search_partition_after_release_load: released a partition") + # # 3. Search the collection after load + # limit = 1000 + # collection_w.load() + # log.info("test_search_partition_after_release_load: searching after load") + # vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] + # collection_w.search(vectors[:nq], default_search_field, default_search_params, + # limit, default_search_exp, _async=_async, + # travel_timestamp=time_stamp, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": insert_ids, + # "limit": limit, + # "_async": _async}) + # # 4. Search the pre-released partition after load + # if limit > par[1].num_entities: + # limit_check = par[1].num_entities + # else: + # limit_check = limit + # collection_w.search(vectors[:nq], default_search_field, default_search_params, + # limit, default_search_exp, + # [par[1].name], _async=_async, + # travel_timestamp=time_stamp, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, + # "ids": insert_ids[par[0].num_entities:], + # "limit": limit_check, + # "_async": _async}) @pytest.mark.tags(CaseLabel.L1) def test_search_load_flush_load(self, nb, nq, dim, auto_id, _async): diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index 9e835979d9f4776bdc482f439b43f58245e3c1d3..b51b6595f54379aaf5f22c5da741af0ed701b42c 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -823,18 +823,17 @@ class TestUtilityBase(TestcaseBase): assert res == exp_res @pytest.mark.tag(CaseLabel.L2) - @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118") def test_loading_progress_with_release_partition(self): """ target: test loading progress after release part partitions method: 1.insert data into two partitions and flush - 2.load collection and release onr partition + 2.load one partiton and release one partition expected: loaded one partition entities """ half = ct.default_nb # insert entities into two partitions, collection flush and load - collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half) - partition_w.release() + collection_w, _, partition_e, _ = self.insert_entities_into_two_partitions(half) + partition_e.release() res = self.utility_wrap.loading_progress(collection_w.name)[0] assert res[num_total_entities] == half * 2 assert res[num_loaded_entities] == half