From d97e42139ab2318f56326431c41d3da68123985f Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Wed, 31 May 2023 19:01:09 +0800 Subject: [PATCH] [test]Add dynamic schema bulk insert test cases (#24481) Signed-off-by: zhuwenxing --- tests/python_client/common/common_func.py | 10 +- .../testcases/test_bulk_insert.py | 139 ++++++++++++++++++ tests/python_client/testcases/test_query.py | 10 +- tests/python_client/testcases/test_search.py | 24 +-- 4 files changed, 162 insertions(+), 21 deletions(-) diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index 934c9cb1c..0b02e637d 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -171,9 +171,11 @@ def gen_collection_schema_all_datatype(description=ct.default_desc, return schema -def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False): +def gen_collection_schema(fields, primary_field=None, description=ct.default_desc, auto_id=False, + enable_dynamic_field=False): schema, _ = ApiCollectionSchemaWrapper().init_collection_schema(fields=fields, primary_field=primary_field, - description=description, auto_id=auto_id) + description=description, auto_id=auto_id, + enable_dynamic_field=enable_dynamic_field) return schema @@ -396,7 +398,7 @@ def gen_invalid_field_types(): return field_types -def gen_invaild_search_params_type(): +def gen_invalid_search_params_type(): invalid_search_key = 100 search_params = [] for index_type in ct.all_index_types: @@ -543,7 +545,7 @@ def gen_normal_string_expressions(field): return expressions -def gen_invaild_string_expressions(): +def gen_invalid_string_expressions(): expressions = [ "varchar in [0, \"1\"]", "varchar not in [\"0\", 1, 2]" diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index 1b7dc1ccf..87f4fbdb3 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -1,4 +1,5 @@ import logging +import random import time import pytest import numpy as np @@ -6,10 +7,12 @@ from pathlib import Path from base.client_base import TestcaseBase from common import common_func as cf from common import common_type as ct +from common.minio_comm import copy_files_to_minio from common.milvus_sys import MilvusSys from common.common_type import CaseLabel, CheckTasks from utils.util_log import test_log as log from common.bulk_insert_data import ( + data_source, prepare_bulk_insert_json_files, prepare_bulk_insert_numpy_files, DataField as df, @@ -837,3 +840,139 @@ class TestBulkInsert(TestcaseBaseBulkInsert): ids = hits.ids results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") assert len(results) == len(ids) + + @pytest.mark.parametrize("auto_id", [True, False]) + def test_dynamic_schema_with_json(self, auto_id): + """ + """ + import json + self._connect() + c_name = cf.gen_unique_str("dynamic_schema") + dim = 128 + nb = 100 + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=True) + self.collection_wrap.init_collection(c_name, schema=schema) + data = [] + for i in range(nb): + d = { + "name": f"test_{i}", + "age": i, + df.pk_field: i, + df.vec_field: [x for x in range(dim)], + } + if auto_id is True: + del d[df.pk_field] + for _ in range(random.randint(0, 3)): + random_key = cf.gen_unique_str("random_key") + random_value = cf.gen_unique_str("random_value") + d[random_key] = random_value + data.append(d) + # generate json file for bulk insert + file_name = "dynamic_schema.json" + json_data = { + "rows": data, + } + with open(f"{data_source}/{file_name}", "w") as f: + json.dump(json_data, f) + # upload data to minio + files = [file_name] + copy_files_to_minio(self.minio_endpoint, data_source, files, self.bucket_name, force=True) + + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + # load collection + self.collection_wrap.load() + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=90 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states: {states}") + assert success + time.sleep(2) + self.utility_wrap.wait_for_index_building_complete(c_name, timeout=120) + res, _ = self.utility_wrap.index_building_progress(c_name) + self.collection_wrap.load(_refresh=True) + log.info(f"wait for load finished and be ready for search") + res, _ = self.collection_wrap.query(expr=f"{df.pk_field} >= 0", output_fields=["name", "age"]) + log.debug(f"query result: {res}") + assert len(res) == nb + + @pytest.mark.parametrize("auto_id", [True, False]) + def test_dynamic_schema_with_numpy(self, auto_id): + """ + """ + import json + self._connect() + c_name = cf.gen_unique_str("dynamic_schema") + dim = 128 + nb = 100 + fields = [ + cf.gen_int64_field(name=df.pk_field, is_primary=True, auto_id=auto_id), + cf.gen_float_vec_field(name=df.vec_field, dim=dim), + ] + schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id, enable_dynamic_field=True) + self.collection_wrap.init_collection(c_name, schema=schema) + if auto_id is True: + files = [f"{df.vec_field}.npy", "$meta.npy"] + else: + files = [f"{df.pk_field}.npy", f"{df.vec_field}.npy", "$meta.npy"] + for f in files: + d = [] + if f == "$meta.npy": + for i in range(nb): + tmp = {"name": f"test_{i}", "age": i} + for _ in range(random.randint(0, 3)): + random_key = cf.gen_unique_str("random_key") + random_value = cf.gen_unique_str("random_value") + tmp[random_key] = random_value + d.append(json.dumps(tmp)) + np.save(f"{data_source}/{f}", d) + elif f == f"{df.pk_field}.npy": + d = np.array([i for i in range(nb)]) + np.save(f"{data_source}/{f}", d) + elif f == f"{df.vec_field}.npy": + d = np.array([[np.float32(i) for i in range(dim)] for _ in range(nb)]) + log.debug(f"vec data: {d}") + np.save(f"{data_source}/{f}", d) + else: + raise Exception(f"unknown file with {files}") + + copy_files_to_minio(self.minio_endpoint, data_source, files, self.bucket_name, force=True) + + index_params = ct.default_index + self.collection_wrap.create_index( + field_name=df.vec_field, index_params=index_params + ) + # load collection + self.collection_wrap.load() + t0 = time.time() + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files + ) + logging.info(f"bulk insert task ids:{task_id}") + success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + task_ids=[task_id], timeout=90 + ) + tt = time.time() - t0 + log.info(f"bulk insert state:{success} in {tt} with states: {states}") + assert success + time.sleep(2) + self.utility_wrap.wait_for_index_building_complete(c_name, timeout=120) + res, _ = self.utility_wrap.index_building_progress(c_name) + self.collection_wrap.load(_refresh=True) + log.info(f"wait for load finished and be ready for search") + res, _ = self.collection_wrap.query(expr=f"{df.pk_field} >= 0", output_fields=["name", "age"]) + log.debug(f"query result: {res}") + assert len(res) == nb diff --git a/tests/python_client/testcases/test_query.py b/tests/python_client/testcases/test_query.py index 67e7b1b26..8e17b0582 100644 --- a/tests/python_client/testcases/test_query.py +++ b/tests/python_client/testcases/test_query.py @@ -18,7 +18,7 @@ prefix = "query" exp_res = "exp_res" default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' default_mix_expr = "int64 >= 0 && varchar >= \"0\"" -default_invaild_expr = "varchar >= 0" +default_invalid_expr = "varchar >= 0" default_string_term_expr = f'{ct.default_string_field_name} in [\"0\", \"1\"]' default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}} binary_index_params = {"index_type": "BIN_IVF_FLAT", "metric_type": "JACCARD", "params": {"nlist": 64}} @@ -880,7 +880,7 @@ class TestQueryParams(TestcaseBase): collection_w.create_index(ct.default_float_vec_field_name, index_params=ct.default_flat_index) collection_w.load() partition_names = cf.gen_unique_str() - error = {ct.err_code: 1, ct.err_msg: f'PartitonName: {partition_names} not found'} + error = {ct.err_code: 1, ct.err_msg: f'PartitionName: {partition_names} not found'} collection_w.query(default_term_expr, partition_names=[partition_names], check_task=CheckTasks.err_res, check_items=error) @@ -1538,7 +1538,7 @@ class TestqueryString(TestcaseBase): check_task=CheckTasks.check_query_results, check_items={exp_res: res}) @pytest.mark.tags(CaseLabel.L1) - @pytest.mark.parametrize("expression", cf.gen_invaild_string_expressions()) + @pytest.mark.parametrize("expression", cf.gen_invalid_string_expressions()) def test_query_with_invalid_string_expr(self, expression): """ target: test query data @@ -1579,10 +1579,10 @@ class TestqueryString(TestcaseBase): check_task=CheckTasks.check_query_results, check_items={exp_res: res}) @pytest.mark.tags(CaseLabel.L1) - def test_query_string_with_invaild_prefix_expr(self): + def test_query_string_with_invalid_prefix_expr(self): """ target: test query with invalid prefix string expression - method: specify string primary field, use invaild prefix string expr + method: specify string primary field, use invalid prefix string expr expected: raise error """ collection_w = self.init_collection_general(prefix, insert_data=True)[0] diff --git a/tests/python_client/testcases/test_search.py b/tests/python_client/testcases/test_search.py index dd421d6e8..ad5f0a2f9 100644 --- a/tests/python_client/testcases/test_search.py +++ b/tests/python_client/testcases/test_search.py @@ -29,8 +29,8 @@ default_limit = ct.default_limit default_search_exp = "int64 >= 0" default_search_string_exp = "varchar >= \"0\"" default_search_mix_exp = "int64 >= 0 && varchar >= \"0\"" -default_invaild_string_exp = "varchar >= 0" -perfix_expr = 'varchar like "0%"' +default_invalid_string_exp = "varchar >= 0" +prefix_expr = 'varchar like "0%"' default_search_field = ct.default_float_vec_field_name default_search_params = ct.default_search_params default_int64_field_name = ct.default_int64_field_name @@ -318,7 +318,7 @@ class TestCollectionSearchInvalid(TestcaseBase): collection_w.create_index("float_vector", default_index) collection_w.load() # 3. search - invalid_search_params = cf.gen_invaild_search_params_type() + invalid_search_params = cf.gen_invalid_search_params_type() message = "Search params check failed" for invalid_search_param in invalid_search_params: if index == invalid_search_param["index_type"]: @@ -681,7 +681,7 @@ class TestCollectionSearchInvalid(TestcaseBase): [deleted_par_name], check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "PartitonName: %s not found" % deleted_par_name}) + "err_msg": "PartitionName: %s not found" % deleted_par_name}) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("index, params", @@ -734,7 +734,7 @@ class TestCollectionSearchInvalid(TestcaseBase): default_limit, default_search_exp, [partition_name], check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "PartitonName: %s not found" % partition_name}) + "err_msg": "PartitionName: %s not found" % partition_name}) @pytest.mark.tags(CaseLabel.L2) @pytest.mark.xfail(reason="issue 15407") @@ -774,7 +774,7 @@ class TestCollectionSearchInvalid(TestcaseBase): search_params, default_limit, "int64 >= 0", check_task=CheckTasks.err_res, check_items={"err_code": 1, - "err_msg": "Data type and metric type mis-match"}) + "err_msg": "Data type and metric type miss-match"}) @pytest.mark.tags(CaseLabel.L2) def test_search_with_output_fields_not_exist(self): @@ -3170,7 +3170,7 @@ class TestSearchBase(TestcaseBase): partition_num=1, dim=dim, is_index=True)[0:5] vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - # 2. create patition + # 2. create partition partition_name = "search_partition_empty" collection_w.create_partition(partition_name=partition_name, description="search partition empty") par = collection_w.partitions @@ -3219,7 +3219,7 @@ class TestSearchBase(TestcaseBase): partition_num=1, dim=dim, is_index=True)[0:5] vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - # 2. create patition + # 2. create partition partition_name = ct.default_partition_name par = collection_w.partitions # collection_w.load() @@ -3336,7 +3336,7 @@ class TestSearchBase(TestcaseBase): partition_num=1, dim=dim, is_index=True)[0:5] vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - # 2. create patition + # 2. create partition partition_name = "search_partition_empty" collection_w.create_partition(partition_name=partition_name, description="search partition empty") par = collection_w.partitions @@ -3379,7 +3379,7 @@ class TestSearchBase(TestcaseBase): partition_num=1, dim=dim, is_index=True)[0:5] vectors = [[random.random() for _ in range(dim)] for _ in range(nq)] - # 2. create patition + # 2. create partition par_name = collection_w.partitions[0].name # collection_w.load() # 3. create different index @@ -3631,7 +3631,7 @@ class TestsearchString(TestcaseBase): vectors = [[random.random() for _ in range(default_dim)] for _ in range(default_nq)] collection_w.search(vectors[:default_nq], default_search_field, default_search_params, default_limit, - default_invaild_string_exp, + default_invalid_string_exp, check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": "failed to create query plan: type mismatch"} @@ -3812,7 +3812,7 @@ class TestsearchString(TestcaseBase): collection_w.search(vectors[:default_nq], default_search_field, # search all buckets {"metric_type": "L2", "params": {"nprobe": 100}}, default_limit, - perfix_expr, + prefix_expr, output_fields=output_fields, _async=_async, travel_timestamp=0, -- GitLab