未验证 提交 b54b72e4 编写于 作者: J jingkl 提交者: GitHub

Add testcase to load and release (#13666)

* Add testcase about load collection
Signed-off-by: Njingkl <jingjing.jia@zilliz.com>

* Add testcase about load collection
Signed-off-by: Njingkl <jingjing.jia@zilliz.com>
上级 06c4662e
from numpy.core.fromnumeric import _partition_dispatcher
import pytest
import sys
from pymilvus import DefaultConfig
......@@ -133,6 +134,49 @@ class TestcaseBase(Base):
**kwargs)
return partition_wrap
def init_collection_without_load(self, prefix, insert_data=False, nb=ct.default_nb,
partition_num=0, is_binary=False, is_all_data_type=False,
auto_id=False, dim=ct.default_dim, is_index=False):
"""
target: create specified collections
method: 1. create collections (binary/non-binary, default/all data type, auto_id or not)
2. create partitions if specified
3. insert specified (binary/non-binary, default/all data type) data
into each partition if any
expected: return collection and raw data, insert ids
"""
log.info("Test case of search interface: initialize before test case")
self._connect()
collection_name = cf.gen_unique_str(prefix)
vectors = []
binary_raw_vectors = []
insert_ids = []
time_stamp = 0
# 1 create collection
default_schema = cf.gen_default_collection_schema(auto_id=auto_id, dim=dim)
if is_binary:
default_schema = cf.gen_default_binary_collection_schema(auto_id=auto_id, dim=dim)
if is_all_data_type:
default_schema = cf.gen_collection_schema_all_datatype(auto_id=auto_id, dim=dim)
log.info("init_collection_general: collection creation")
collection_w = self.init_collection_wrap(name=collection_name,
schema=default_schema)
# 2 add extra partitions if specified (default is 1 partition named "_default")
if partition_num > 0:
cf.gen_partitions(collection_w, partition_num)
# 3 insert data if specified
if insert_data:
collection_w, vectors, binary_raw_vectors, insert_ids, time_stamp = \
cf.insert_data(collection_w, nb, is_binary, is_all_data_type, auto_id=auto_id, dim=dim)
assert collection_w.is_empty is False
assert collection_w.num_entities == nb
log.info("insert_data: inserted data into collection %s (num_entities: %s)"
% (collection_w.name, nb))
return collection_w, vectors, binary_raw_vectors, insert_ids, time_stamp
def init_collection_general(self, prefix, insert_data=False, nb=ct.default_nb,
partition_num=0, is_binary=False, is_all_data_type=False,
auto_id=False, dim=ct.default_dim, is_index=False):
......@@ -194,7 +238,30 @@ class TestcaseBase(Base):
collection_w.insert(df_default)
conn.flush([collection_w.name])
collection_w.load()
return collection_w, partition_w, df_partition, df_default
return collection_w, partition_w,df_partition, df_default
def insert_entities_into_two_partitions(self, half, prefix='query'):
"""
insert default entities into two partitions(partition_w and _default) in half(int64 and float fields values)
:param half: half of nb
:return: collection wrap and partition wrap
"""
conn = self._connect()
collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
partition_w = self.init_partition_wrap(collection_wrap=collection_w)
partition_e = self.init_partition_wrap(collection_wrap=collection_w)
# insert [0, half) into partition_w
df_partition = cf.gen_default_dataframe_data(nb=half, start=0)
partition_w.insert(df_partition)
# insert [half, nb) into _default
df_default = cf.gen_default_dataframe_data(nb=half, start=half)
partition_e.insert(df_default)
conn.flush([collection_w.name])
partition_w.load()
# partition_e.load()
return collection_w, partition_w, partition_e,df_default
def collection_insert_multi_segments_one_shard(self, collection_prefix, num_of_segment=2, nb_of_segment=1,
is_dup=True):
......
from os import name
import threading
import pytest
......@@ -233,21 +234,76 @@ class TestPartitionParams(TestcaseBase):
# check that the partition not exists
assert not collection_w.has_partition(partition_name)[0]
@pytest.mark.tags(CaseLabel.L2)
def test_load_partiton_respectively(self):
"""
target: test release the partiton after load partiton
method: load partiton1 and load other partiton
expected: raise exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w)
partition_w2 = self.init_partition_wrap(collection_w)
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_w1.load()
error = {ct.err_code: 1, ct.err_msg: f'load the partition after load collection is not supported'}
partition_w2.load(check_task=CheckTasks.err_res,
check_items=error)
@pytest.mark.tags(CaseLabel.L2)
def test_load_partitons_after_release(self):
"""
target: test release the partiton after load partition
method: load partitons and release partitions
expected: no exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w,name="partition_w1")
partition_w2 = self.init_partition_wrap(collection_w,name="partition_w2")
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_names=["partition_w1","partition_w2"]
collection_w.load(partition_names)
collection_w.release(partition_names)
@pytest.mark.tags(CaseLabel.L2)
def test_load_partiton_after_load_partition(self):
"""
target: test release the partiton after load partition
method: load partition1 and release the partition1
load partition2
expected: no exception
"""
self._connect()
collection_w = self.init_collection_wrap()
partition_w1 = self.init_partition_wrap(collection_w)
partition_w2 = self.init_partition_wrap(collection_w)
partition_w1.insert(cf.gen_default_list_data())
partition_w2.insert(cf.gen_default_list_data())
partition_w1.load()
partition_w1.release()
partition_w2.load()
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_partition_release(self):
"""
target: verify release partition
method: 1. create a collection and two partitions
2. insert data into each partition
3. flush and load the both partitions
3. flush and load the partition1
4. release partition1
5. release partition1 twice
5. release partition2
expected: 1. the 1st partition is released
2. the 2nd partition is not released
2. the 2nd partition is released
"""
# create collection
collection_w = self.init_collection_wrap()
# create two partitions
......@@ -260,20 +316,17 @@ class TestPartitionParams(TestcaseBase):
# load two partitions
partition_w1.load()
partition_w2.load()
# search two partitions
# search partition1
search_vectors = cf.gen_vectors(1, ct.default_dim)
res1, _ = partition_w1.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1)
res2, _ = partition_w2.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1)
assert len(res1) == 1 and len(res2) == 1
assert len(res1) == 1
# release the first partition
partition_w1.release()
partition_w2.release()
# check result
res1, _ = partition_w1.search(data=search_vectors,
......@@ -281,10 +334,6 @@ class TestPartitionParams(TestcaseBase):
params={"nprobe": 32}, limit=1,
check_task=ct.CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "partitions have been released"})
res2, _ = partition_w2.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
params={"nprobe": 32}, limit=1)
assert len(res2) == 1
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("data", [cf.gen_default_dataframe_data(10),
......
......@@ -476,7 +476,6 @@ class TestCollectionSearchInvalid(TestcaseBase):
"into memory" % collection_w.name})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_release_partition(self):
"""
target: test the scenario which search the released collection
......@@ -487,7 +486,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
"""
# 1. initialize with data
partition_num = 1
collection_w = self.init_collection_general(prefix, True, 10, partition_num)[0]
collection_w = self.init_collection_without_load(prefix, True, 10, partition_num)[0]
par = collection_w.partitions
par_name = par[partition_num].name
# 2. release partition
......@@ -545,7 +544,6 @@ class TestCollectionSearchInvalid(TestcaseBase):
"limit": default_limit})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_deleted(self):
"""
target: test search deleted partition
......@@ -975,7 +973,7 @@ class TestCollectionSearch(TestcaseBase):
"_async": _async})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
@pytest.mark.xfail(reason="issue #13611")
def test_search_before_after_delete(self, nq, dim, auto_id, _async):
"""
target: test search function before and after deletion
......@@ -1007,6 +1005,7 @@ class TestCollectionSearch(TestcaseBase):
log.info("test_search_before_after_delete: deleting a partition")
par = collection_w.partitions
deleted_entity_num = par[partition_num].num_entities
print(deleted_entity_num)
entity_num = nb - deleted_entity_num
collection_w.drop_partition(par[partition_num].name)
log.info("test_search_before_after_delete: deleted a partition")
......@@ -1022,96 +1021,96 @@ class TestCollectionSearch(TestcaseBase):
"limit": limit - deleted_entity_num,
"_async": _async})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_one(self, nq, dim, auto_id, _async):
"""
target: test search function before and after release
method: 1. search the collection
2. release a partition
3. search the collection
expected: the deleted entities should not be searched
"""
# 1. initialize with data
nb = 1000
limit = 1000
partition_num = 1
collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb,
partition_num,
auto_id=auto_id,
dim=dim)[0:4]
# 2. search all the partitions before partition deletion
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
log.info("test_search_partition_after_release_one: searching before deleting partitions")
collection_w.search(vectors[:nq], default_search_field,
default_search_params, limit,
default_search_exp, _async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": limit,
"_async": _async})
# 3. release one partition
log.info("test_search_partition_after_release_one: releasing a partition")
par = collection_w.partitions
deleted_entity_num = par[partition_num].num_entities
entity_num = nb - deleted_entity_num
conn = self.connection_wrap.get_connection()[0]
conn.release_partitions(collection_w.name, [par[partition_num].name])
log.info("test_search_partition_after_release_one: released a partition")
# 4. search collection after release one partition
log.info("test_search_partition_after_release_one: searching after deleting partitions")
collection_w.search(vectors[:nq], default_search_field,
default_search_params, limit,
default_search_exp, _async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids[:entity_num],
"limit": limit - deleted_entity_num,
"_async": _async})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_all(self, nq, dim, auto_id, _async):
"""
target: test search function before and after release
method: 1. search the collection
2. release all partitions
3. search the collection
expected: 0 entity should be searched
"""
# 1. initialize with data
nb = 1000
limit = 1000
collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb,
1, auto_id=auto_id,
dim=dim)[0:4]
# 2. search all the partitions before partition deletion
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
log.info("test_search_partition_after_release_all: searching before deleting partitions")
collection_w.search(vectors[:nq], default_search_field,
default_search_params, limit,
default_search_exp, _async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": limit,
"_async": _async})
# 3. release all partitions
log.info("test_search_partition_after_release_all: releasing a partition")
par = collection_w.partitions
conn = self.connection_wrap.get_connection()[0]
conn.release_partitions(collection_w.name, [par[0].name, par[1].name])
log.info("test_search_partition_after_release_all: released a partition")
# 4. search collection after release all partitions
collection_w.search(vectors[:nq], default_search_field,
default_search_params, limit,
default_search_exp, _async=_async,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": [],
"limit": 0,
"_async": _async})
# @pytest.mark.tags(CaseLabel.L1)
# @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
# def test_search_partition_after_release_one(self, nq, dim, auto_id, _async):
# """
# target: test search function before and after release
# method: 1. search the collection
# 2. release a partition
# 3. search the collection
# expected: the deleted entities should not be searched
# """
# # 1. initialize with data
# nb = 1000
# limit = 1000
# partition_num = 1
# collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb,
# partition_num,
# auto_id=auto_id,
# dim=dim)[0:4]
# # 2. search all the partitions before partition deletion
# vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# log.info("test_search_partition_after_release_one: searching before deleting partitions")
# collection_w.search(vectors[:nq], default_search_field,
# default_search_params, limit,
# default_search_exp, _async=_async,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": insert_ids,
# "limit": limit,
# "_async": _async})
# # 3. release one partition
# log.info("test_search_partition_after_release_one: releasing a partition")
# par = collection_w.partitions
# deleted_entity_num = par[partition_num].num_entities
# entity_num = nb - deleted_entity_num
# conn = self.connection_wrap.get_connection()[0]
# conn.release_partitions(collection_w.name, [par[partition_num].name])
# log.info("test_search_partition_after_release_one: released a partition")
# # 4. search collection after release one partition
# log.info("test_search_partition_after_release_one: searching after deleting partitions")
# collection_w.search(vectors[:nq], default_search_field,
# default_search_params, limit,
# default_search_exp, _async=_async,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": insert_ids[:entity_num],
# "limit": limit - deleted_entity_num,
# "_async": _async})
# @pytest.mark.tags(CaseLabel.L2)
# @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
# def test_search_partition_after_release_all(self, nq, dim, auto_id, _async):
# """
# target: test search function before and after release
# method: 1. search the collection
# 2. release all partitions
# 3. search the collection
# expected: 0 entity should be searched
# """
# # 1. initialize with data
# nb = 1000
# limit = 1000
# collection_w, _, _, insert_ids = self.init_collection_general(prefix, True, nb,
# 1, auto_id=auto_id,
# dim=dim)[0:4]
# # 2. search all the partitions before partition deletion
# vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# log.info("test_search_partition_after_release_all: searching before deleting partitions")
# collection_w.search(vectors[:nq], default_search_field,
# default_search_params, limit,
# default_search_exp, _async=_async,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": insert_ids,
# "limit": limit,
# "_async": _async})
# # 3. release all partitions
# log.info("test_search_partition_after_release_all: releasing a partition")
# par = collection_w.partitions
# conn = self.connection_wrap.get_connection()[0]
# conn.release_partitions(collection_w.name, [par[0].name, par[1].name])
# log.info("test_search_partition_after_release_all: released a partition")
# # 4. search collection after release all partitions
# collection_w.search(vectors[:nq], default_search_field,
# default_search_params, limit,
# default_search_exp, _async=_async,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": [],
# "limit": 0,
# "_async": _async})
@pytest.mark.tags(CaseLabel.L1)
def test_search_collection_after_release_load(self, nb, nq, dim, auto_id, _async):
......@@ -1145,55 +1144,55 @@ class TestCollectionSearch(TestcaseBase):
"limit": default_limit,
"_async": _async})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.xfail(reason="issue 6997")
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_search_partition_after_release_load(self, nb, nq, dim, auto_id, _async):
"""
target: search the pre-released collection after load
method: 1. create collection
2. release a partition
3. load partition
4. search the pre-released partition
expected: search successfully
"""
# 1. initialize without data
collection_w, _, _, insert_ids, time_stamp = self.init_collection_general(prefix, True, nb,
1, auto_id=auto_id,
dim=dim)[0:5]
# 2. release collection
log.info("test_search_partition_after_release_load: releasing a partition")
par = collection_w.partitions
conn = self.connection_wrap.get_connection()[0]
conn.release_partitions(collection_w.name, [par[1].name])
log.info("test_search_partition_after_release_load: released a partition")
# 3. Search the collection after load
limit = 1000
collection_w.load()
log.info("test_search_partition_after_release_load: searching after load")
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
collection_w.search(vectors[:nq], default_search_field, default_search_params,
limit, default_search_exp, _async=_async,
travel_timestamp=time_stamp,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids,
"limit": limit,
"_async": _async})
# 4. Search the pre-released partition after load
if limit > par[1].num_entities:
limit_check = par[1].num_entities
else:
limit_check = limit
collection_w.search(vectors[:nq], default_search_field, default_search_params,
limit, default_search_exp,
[par[1].name], _async=_async,
travel_timestamp=time_stamp,
check_task=CheckTasks.check_search_results,
check_items={"nq": nq,
"ids": insert_ids[par[0].num_entities:],
"limit": limit_check,
"_async": _async})
# @pytest.mark.tags(CaseLabel.L2)
# @pytest.mark.xfail(reason="issue 6997")
# @pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
# def test_search_partition_after_release_load(self, nb, nq, dim, auto_id, _async):
# """
# target: search the pre-released collection after load
# method: 1. create collection
# 2. release a partition
# 3. load partition
# 4. search the pre-released partition
# expected: search successfully
# """
# # 1. initialize without data
# collection_w, _, _, insert_ids, time_stamp = self.init_collection_without_load(prefix, True, nb,
# 1, auto_id=auto_id,
# dim=dim)[0:5]
# # 2. release collection
# log.info("test_search_partition_after_release_load: releasing a partition")
# par = collection_w.partitions
# conn = self.connection_wrap.get_connection()[0]
# conn.release_partitions(collection_w.name, [par[1].name])
# log.info("test_search_partition_after_release_load: released a partition")
# # 3. Search the collection after load
# limit = 1000
# collection_w.load()
# log.info("test_search_partition_after_release_load: searching after load")
# vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# collection_w.search(vectors[:nq], default_search_field, default_search_params,
# limit, default_search_exp, _async=_async,
# travel_timestamp=time_stamp,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": insert_ids,
# "limit": limit,
# "_async": _async})
# # 4. Search the pre-released partition after load
# if limit > par[1].num_entities:
# limit_check = par[1].num_entities
# else:
# limit_check = limit
# collection_w.search(vectors[:nq], default_search_field, default_search_params,
# limit, default_search_exp,
# [par[1].name], _async=_async,
# travel_timestamp=time_stamp,
# check_task=CheckTasks.check_search_results,
# check_items={"nq": nq,
# "ids": insert_ids[par[0].num_entities:],
# "limit": limit_check,
# "_async": _async})
@pytest.mark.tags(CaseLabel.L1)
def test_search_load_flush_load(self, nb, nq, dim, auto_id, _async):
......
......@@ -823,18 +823,17 @@ class TestUtilityBase(TestcaseBase):
assert res == exp_res
@pytest.mark.tag(CaseLabel.L2)
@pytest.mark.skip("https://github.com/milvus-io/milvus/issues/13118")
def test_loading_progress_with_release_partition(self):
"""
target: test loading progress after release part partitions
method: 1.insert data into two partitions and flush
2.load collection and release onr partition
2.load one partiton and release one partition
expected: loaded one partition entities
"""
half = ct.default_nb
# insert entities into two partitions, collection flush and load
collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)
partition_w.release()
collection_w, _, partition_e, _ = self.insert_entities_into_two_partitions(half)
partition_e.release()
res = self.utility_wrap.loading_progress(collection_w.name)[0]
assert res[num_total_entities] == half * 2
assert res[num_loaded_entities] == half
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册