未验证 提交 d97e4213 编写于 作者: Z zhuwenxing 提交者: GitHub

[test]Add dynamic schema bulk insert test cases (#24481)

Signed-off-by: Nzhuwenxing <wenxing.zhu@zilliz.com>
上级 3fb9334b
......@@ -171,9 +171,11 @@ def gen_collection_schema_all_datatype(description=ct.default_desc,
return schema
def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False):
def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False,
enable_dynamic_field=False):
schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, primary_field=primary_field,
description=description, auto_id=auto_id)
description=description, auto_id=auto_id,
enable_dynamic_field=enable_dynamic_field)
return schema
......@@ -396,7 +398,7 @@ def gen_invalid_field_types():
return field_types
def gen_invaild_search_params_type():
def gen_invalid_search_params_type():
invalid_search_key = 100
search_params = []
for index_type in ct.all_index_types:
......@@ -543,7 +545,7 @@ def gen_normal_string_expressions(field):
return expressions
def gen_invaild_string_expressions():
def gen_invalid_string_expressions():
expressions = [
"varchar in [0, \"1\"]",
"varchar not in [\"0\", 1, 2]"
......
import logging
import random
import time
import pytest
import numpy as np
......@@ -6,10 +7,12 @@ from pathlib import Path
from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.minio_comm import copy_files_to_minio
from common.milvus_sys import MilvusSys
from common.common_type import CaseLabel, CheckTasks
from utils.util_log import test_log as log
from common.bulk_insert_data import (
data_source,
prepare_bulk_insert_json_files,
prepare_bulk_insert_numpy_files,
DataField as df,
......@@ -837,3 +840,139 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
ids = hits.ids
results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}")
assert len(results) == len(ids)
@pytest.mark.parametrize("auto_id", [True, False])
def test_dynamic_schema_with_json(self, auto_id):
"""
"""
import json
self._connect()
c_name = cf.gen_unique_str("dynamic_schema")
dim = 128
nb = 100
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=True)
self.collection_wrap.init_collection(c_name, schema=schema)
data = []
for i in range(nb):
d = {
"name": f"test_{i}",
"age": i,
df.pk_field: i,
df.vec_field: [x for x in range(dim)],
}
if auto_id is True:
del d[df.pk_field]
for _ in range(random.randint(0, 3)):
random_key = cf.gen_unique_str("random_key")
random_value = cf.gen_unique_str("random_value")
d[random_key] = random_value
data.append(d)
# generate json file for bulk insert
file_name = "dynamic_schema.json"
json_data = {
"rows": data,
}
with open(f"{data_source}/{file_name}", "w") as f:
json.dump(json_data, f)
# upload data to minio
files = [file_name]
copy_files_to_minio(self.minio_endpoint, data_source, files, self.bucket_name, force=True)
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states: {states}")
assert success
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=120)
res, _ = self.utility_wrap.index_building_progress(c_name)
self.collection_wrap.load(_refresh=True)
log.info(f"wait for load finished and be ready for search")
res, _ = self.collection_wrap.query(expr=f"{df.pk_field} >= 0", output_fields=["name", "age"])
log.debug(f"query result: {res}")
assert len(res) == nb
@pytest.mark.parametrize("auto_id", [True, False])
def test_dynamic_schema_with_numpy(self, auto_id):
"""
"""
import json
self._connect()
c_name = cf.gen_unique_str("dynamic_schema")
dim = 128
nb = 100
fields = [
cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id),
cf.gen_float_vec_field(name=df.vec_field, dim=dim),
]
schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=True)
self.collection_wrap.init_collection(c_name, schema=schema)
if auto_id is True:
files = [f"{df.vec_field}.npy", "$meta.npy"]
else:
files = [f"{df.pk_field}.npy", f"{df.vec_field}.npy", "$meta.npy"]
for f in files:
d = []
if f == "$meta.npy":
for i in range(nb):
tmp = {"name": f"test_{i}", "age": i}
for _ in range(random.randint(0, 3)):
random_key = cf.gen_unique_str("random_key")
random_value = cf.gen_unique_str("random_value")
tmp[random_key] = random_value
d.append(json.dumps(tmp))
np.save(f"{data_source}/{f}", d)
elif f == f"{df.pk_field}.npy":
d = np.array([i for i in range(nb)])
np.save(f"{data_source}/{f}", d)
elif f == f"{df.vec_field}.npy":
d = np.array([[np.float32(i) for i in range(dim)] for _ in range(nb)])
log.debug(f"vec data: {d}")
np.save(f"{data_source}/{f}", d)
else:
raise Exception(f"unknown file with {files}")
copy_files_to_minio(self.minio_endpoint, data_source, files, self.bucket_name, force=True)
index_params = ct.default_index
self.collection_wrap.create_index(
field_name=df.vec_field, index_params=index_params
)
# load collection
self.collection_wrap.load()
t0 = time.time()
task_id, _ = self.utility_wrap.do_bulk_insert(
collection_name=c_name, files=files
)
logging.info(f"bulk insert task ids:{task_id}")
success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed(
task_ids=[task_id], timeout=90
)
tt = time.time() - t0
log.info(f"bulk insert state:{success} in {tt} with states: {states}")
assert success
time.sleep(2)
self.utility_wrap.wait_for_index_building_complete(c_name, timeout=120)
res, _ = self.utility_wrap.index_building_progress(c_name)
self.collection_wrap.load(_refresh=True)
log.info(f"wait for load finished and be ready for search")
res, _ = self.collection_wrap.query(expr=f"{df.pk_field} >= 0", output_fields=["name", "age"])
log.debug(f"query result: {res}")
assert len(res) == nb
......@@ -18,7 +18,7 @@ prefix = "query"
exp_res = "exp_res"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'
default_mix_expr = "int64 >= 0 && varchar >= \"0\""
default_invaild_expr = "varchar >= 0"
default_invalid_expr = "varchar >= 0"
default_string_term_expr = f'{ct.default_string_field_name} in [\"0\", \"1\"]'
default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
binary_index_params = {"index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD", "params": {"nlist": 64}}
......@@ -880,7 +880,7 @@ class TestQueryParams(TestcaseBase):
collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index)
collection_w.load()
partition_names = cf.gen_unique_str()
error = {ct.err_code: 1, ct.err_msg: f'PartitonName: {partition_names} not found'}
error = {ct.err_code: 1, ct.err_msg: f'PartitionName: {partition_names} not found'}
collection_w.query(default_term_expr, partition_names=[partition_names],
check_task=CheckTasks.err_res, check_items=error)
......@@ -1538,7 +1538,7 @@ class TestqueryString(TestcaseBase):
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.parametrize("expression", cf.gen_invaild_string_expressions())
@pytest.mark.parametrize("expression", cf.gen_invalid_string_expressions())
def test_query_with_invalid_string_expr(self, expression):
"""
target: test query data
......@@ -1579,10 +1579,10 @@ class TestqueryString(TestcaseBase):
check_task=CheckTasks.check_query_results, check_items={exp_res: res})
@pytest.mark.tags(CaseLabel.L1)
def test_query_string_with_invaild_prefix_expr(self):
def test_query_string_with_invalid_prefix_expr(self):
"""
target: test query with invalid prefix string expression
method: specify string primary field, use invaild prefix string expr
method: specify string primary field, use invalid prefix string expr
expected: raise error
"""
collection_w = self.init_collection_general(prefix, insert_data=True)[0]
......
......@@ -29,8 +29,8 @@ default_limit = ct.default_limit
default_search_exp = "int64 >= 0"
default_search_string_exp = "varchar >= \"0\""
default_search_mix_exp = "int64 >= 0 && varchar >= \"0\""
default_invaild_string_exp = "varchar >= 0"
perfix_expr = 'varchar like "0%"'
default_invalid_string_exp = "varchar >= 0"
prefix_expr = 'varchar like "0%"'
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
......@@ -318,7 +318,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
collection_w.create_index("float_vector", default_index)
collection_w.load()
# 3. search
invalid_search_params = cf.gen_invaild_search_params_type()
invalid_search_params = cf.gen_invalid_search_params_type()
message = "Search params check failed"
for invalid_search_param in invalid_search_params:
if index == invalid_search_param["index_type"]:
......@@ -681,7 +681,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
[deleted_par_name],
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "PartitonName: %s not found" % deleted_par_name})
"err_msg": "PartitionName: %s not found" % deleted_par_name})
@pytest.mark.tags(CaseLabel.L3)
@pytest.mark.parametrize("index, params",
......@@ -734,7 +734,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
default_limit, default_search_exp, [partition_name],
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "PartitonName: %s not found" % partition_name})
"err_msg": "PartitionName: %s not found" % partition_name})
@pytest.mark.tags(CaseLabel.L2)
@pytest.mark.xfail(reason="issue 15407")
......@@ -774,7 +774,7 @@ class TestCollectionSearchInvalid(TestcaseBase):
search_params, default_limit, "int64 >= 0",
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "Data type and metric type mis-match"})
"err_msg": "Data type and metric type miss-match"})
@pytest.mark.tags(CaseLabel.L2)
def test_search_with_output_fields_not_exist(self):
......@@ -3170,7 +3170,7 @@ class TestSearchBase(TestcaseBase):
partition_num=1,
dim=dim, is_index=True)[0:5]
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# 2. create patition
# 2. create partition
partition_name = "search_partition_empty"
collection_w.create_partition(partition_name=partition_name, description="search partition empty")
par = collection_w.partitions
......@@ -3219,7 +3219,7 @@ class TestSearchBase(TestcaseBase):
partition_num=1,
dim=dim, is_index=True)[0:5]
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# 2. create patition
# 2. create partition
partition_name = ct.default_partition_name
par = collection_w.partitions
# collection_w.load()
......@@ -3336,7 +3336,7 @@ class TestSearchBase(TestcaseBase):
partition_num=1,
dim=dim, is_index=True)[0:5]
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# 2. create patition
# 2. create partition
partition_name = "search_partition_empty"
collection_w.create_partition(partition_name=partition_name, description="search partition empty")
par = collection_w.partitions
......@@ -3379,7 +3379,7 @@ class TestSearchBase(TestcaseBase):
partition_num=1,
dim=dim, is_index=True)[0:5]
vectors = [[random.random() for _ in range(dim)] for _ in range(nq)]
# 2. create patition
# 2. create partition
par_name = collection_w.partitions[0].name
# collection_w.load()
# 3. create different index
......@@ -3631,7 +3631,7 @@ class TestsearchString(TestcaseBase):
vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)]
collection_w.search(vectors[:default_nq], default_search_field,
default_search_params, default_limit,
default_invaild_string_exp,
default_invalid_string_exp,
check_task=CheckTasks.err_res,
check_items={"err_code": 1,
"err_msg": "failed to create query plan: type mismatch"}
......@@ -3812,7 +3812,7 @@ class TestsearchString(TestcaseBase):
collection_w.search(vectors[:default_nq], default_search_field,
# search all buckets
{"metric_type": "L2", "params": {"nprobe": 100}}, default_limit,
perfix_expr,
prefix_expr,
output_fields=output_fields,
_async=_async,
travel_timestamp=0,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册