From ee5da73fae1f41c88853940a517f27f84cdbd16a Mon Sep 17 00:00:00 2001 From: zhuwenxing Date: Mon, 31 Jul 2023 12:45:03 +0800 Subject: [PATCH] [test]Add bulk insert for test and refactoring the checker function (#25997) Signed-off-by: zhuwenxing --- tests/python_client/base/utility_wrapper.py | 3 +- .../bulk_insert/test_bulk_insert_perf.py | 3 +- tests/python_client/chaos/checker.py | 141 ++++++----- tests/python_client/chaos/conftest.py | 2 +- tests/python_client/chaos/constants.py | 8 +- .../test_all_collections_after_chaos.py | 26 ++- ...le_request_operation_for_rolling_update.py | 31 ++- .../testcases/test_verify_all_collections.py | 6 +- tests/python_client/common/common_func.py | 220 ++++++++++++++++-- tests/python_client/requirements.txt | 1 + .../testcases/test_bulk_insert.py | 1 - 11 files changed, 331 insertions(+), 111 deletions(-) diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index de8680e6c..fcd71f314 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -95,8 +95,7 @@ class ApiUtilityWrapper: unknown, "unknown") def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted, - timeout=None, using="default", **kwargs): - start = time.time() + timeout=None, using="default", **kwargs): tasks_state_distribution = { "success": set(), "failed": set(), diff --git a/tests/python_client/bulk_insert/test_bulk_insert_perf.py b/tests/python_client/bulk_insert/test_bulk_insert_perf.py index 00055c65f..f9c62d7e8 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_perf.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_perf.py @@ -106,7 +106,8 @@ class TestChaos(TestChaosBase): minio_endpoint = f"{minio_ip}:{minio_port}" bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"] schema = cf.gen_bulk_insert_collection_schema(dim=dim, with_varchar_field=with_varchar_field) - data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len, with_varchar_field=with_varchar_field) + data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len, + with_varchar_field=with_varchar_field) data_dir = "/tmp/bulk_insert_data" Path(data_dir).mkdir(parents=True, exist_ok=True) files = [] diff --git a/tests/python_client/chaos/checker.py b/tests/python_client/chaos/checker.py index 66810057c..f8246b4c7 100644 --- a/tests/python_client/chaos/checker.py +++ b/tests/python_client/chaos/checker.py @@ -35,7 +35,7 @@ enable_traceback = False DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}' -def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True): +def trace(fmt=DEFAULT_FMT, prefix='test', flag=True): def decorate(func): @functools.wraps(func) def inner_wrapper(self, *args, **kwargs): @@ -102,20 +102,30 @@ class Checker: self._keep_running = True self.rsp_times = [] self.average_time = 0 + self.files = [] self.c_wrap = ApiCollectionWrapper() + self.utility_wrap = ApiUtilityWrapper() c_name = collection_name if collection_name is not None else cf.gen_unique_str( 'Checker_') + self.c_name = c_name schema = cf.gen_default_collection_schema(dim=dim) if schema is None else schema + self.schema = schema + self.dim = cf.get_dim_by_schema(schema=schema) + self.int64_field_name = cf.get_int64_field_name(schema=schema) + self.float_vector_field_name = cf.get_float_vec_field_name(schema=schema) self.c_wrap.init_collection(name=c_name, schema=schema, shards_num=shards_num, timeout=timeout, - # active_trace=True, enable_traceback=enable_traceback) if insert_data: - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH, dim=dim), - timeout=timeout, - enable_traceback=enable_traceback) + log.info(f"collection {c_name} created, start to insert data") + t0 = time.perf_counter() + self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0), + timeout=timeout, + enable_traceback=enable_traceback) + log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s") + self.initial_entities = self.c_wrap.num_entities # do as a flush def total(self): @@ -166,15 +176,38 @@ class Checker: f"end at {self.fail_records[-1][2]}") return recovery_time + def prepare_bulk_insert_data(self, + nb=constants.ENTITIES_FOR_BULKINSERT, + file_type="npy", + minio_endpoint="127.0.0.1:9000", + bucket_name="milvus-bucket"): + schema = self.schema + log.info(f"prepare data for bulk insert") + files = cf.prepare_bulk_insert_data(schema=schema, + nb=nb, + file_type=file_type, + minio_endpoint=minio_endpoint, + bucket_name=bucket_name) + self.files = files + return files + + def do_bulk_insert(self): + log.info(f"bulk insert collection name: {self.c_name}") + task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name, + files=self.files) + log.info(f"task ids {task_ids}") + completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=720) + return task_ids, completed + class SearchChecker(Checker): """check search operations in a dependent thread""" - def __init__(self, collection_name=None, shards_num=2, replica_number=1): + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None,): if collection_name is None: collection_name = cf.gen_unique_str("SearchChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num) - self.c_wrap.create_index(ct.default_float_vec_field_name, + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str('index_'), timeout=timeout, @@ -186,8 +219,8 @@ class SearchChecker(Checker): @trace() def search(self): res, result = self.c_wrap.search( - data=cf.gen_vectors(5, ct.default_dim), - anns_field=ct.default_float_vec_field_name, + data=cf.gen_vectors(5, self.dim), + anns_field=self.float_vector_field_name, param=constants.DEFAULT_SEARCH_PARAM, limit=1, timeout=timeout, @@ -209,8 +242,8 @@ class SearchChecker(Checker): class InsertFlushChecker(Checker): """check Insert and flush operations in a dependent thread""" - def __init__(self, collection_name=None, flush=False, shards_num=2): - super().__init__(collection_name=collection_name, shards_num=shards_num) + def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self._flush = flush self.initial_entities = self.c_wrap.num_entities @@ -218,7 +251,7 @@ class InsertFlushChecker(Checker): while True: t0 = time.time() _, insert_result = \ - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -250,10 +283,10 @@ class InsertFlushChecker(Checker): class FlushChecker(Checker): """check flush operations in a dependent thread""" - def __init__(self, collection_name=None, shards_num=2): + def __init__(self, collection_name=None, shards_num=2, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("FlushChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num) + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self.initial_entities = self.c_wrap.num_entities @trace() @@ -268,7 +301,7 @@ class FlushChecker(Checker): @exception_handler() def run_task(self): - _, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS), + _, result = self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), timeout=timeout, enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) @@ -284,20 +317,20 @@ class FlushChecker(Checker): class InsertChecker(Checker): """check flush operations in a dependent thread""" - def __init__(self, collection_name=None, flush=False, shards_num=2): + def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("InsertChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num) + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) self._flush = flush self.initial_entities = self.c_wrap.num_entities self.inserted_data = [] self.scale = 1*10**6 self.start_time_stamp = int(time.time()*self.scale) # us - self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp}' + self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}' @trace() def insert(self): - data = cf.gen_default_list_data(nb=constants.DELTA_PER_INS) + data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema) ts_data = [] for i in range(constants.DELTA_PER_INS): time.sleep(0.001) @@ -326,7 +359,7 @@ class InsertChecker(Checker): def verify_data_completeness(self): try: - self.c_wrap.create_index(ct.default_float_vec_field_name, + self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str('index_'), timeout=timeout, @@ -336,14 +369,14 @@ class InsertChecker(Checker): log.error(f"create index error: {e}") self.c_wrap.load() end_time_stamp = int(time.time()*self.scale) - self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp} and ' \ - f'{ct.default_int64_field_name} <= {end_time_stamp}' + self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp} and ' \ + f'{self.int64_field_name} <= {end_time_stamp}' data_in_client = [] for d in self.inserted_data: if self.start_time_stamp <= d <= end_time_stamp: data_in_client.append(d) res, result = self.c_wrap.query(self.term_expr, timeout=timeout, - output_fields=[f'{ct.default_int64_field_name}'], + output_fields=[f'{self.int64_field_name}'], limit=len(data_in_client) * 2, check_task=CheckTasks.check_nothing) @@ -357,10 +390,10 @@ class InsertChecker(Checker): class CreateChecker(Checker): """check create operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("CreateChecker_") - super().__init__(collection_name=collection_name) + super().__init__(collection_name=collection_name, schema=schema) @trace() def init_collection(self): @@ -388,20 +421,20 @@ class CreateChecker(Checker): class IndexChecker(Checker): """check Insert operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("IndexChecker_") - super().__init__(collection_name=collection_name) + super().__init__(collection_name=collection_name, schema=schema) self.index_name = cf.gen_unique_str('index_') for i in range(5): - self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH), + self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema), timeout=timeout, enable_traceback=enable_traceback) # do as a flush before indexing log.debug(f"Index ready entities: {self.c_wrap.num_entities}") @trace() def create_index(self): - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=self.index_name, timeout=timeout, @@ -425,11 +458,11 @@ class IndexChecker(Checker): class QueryChecker(Checker): """check query operations in a dependent thread""" - def __init__(self, collection_name=None, shards_num=2, replica_number=1): + def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("QueryChecker_") - super().__init__(collection_name=collection_name, shards_num=shards_num) - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema) + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str( 'index_'), @@ -450,8 +483,8 @@ class QueryChecker(Checker): int_values = [] for _ in range(5): int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH)) - self.term_expr = f'{ct.default_int64_field_name} in {int_values}' - res, result= self.query() + self.term_expr = f'{self.int64_field_name} in {int_values}' + res, result = self.query() return res, result def keep_running(self): @@ -463,12 +496,12 @@ class QueryChecker(Checker): class LoadChecker(Checker): """check load operations in a dependent thread""" - def __init__(self, collection_name=None, replica_number=1): + def __init__(self, collection_name=None, replica_number=1, schema=None): if collection_name is None: - collection_name = cf.gen_unique_str("DeleteChecker_") - super().__init__(collection_name=collection_name) + collection_name = cf.gen_unique_str("LoadChecker_") + super().__init__(collection_name=collection_name, schema=schema) self.replica_number = replica_number - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str( 'index_'), @@ -497,11 +530,11 @@ class LoadChecker(Checker): class DeleteChecker(Checker): """check delete operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("DeleteChecker_") - super().__init__(collection_name=collection_name) - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + super().__init__(collection_name=collection_name, schema=schema) + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str( 'index_'), @@ -509,10 +542,10 @@ class DeleteChecker(Checker): enable_traceback=enable_traceback, check_task=CheckTasks.check_nothing) self.c_wrap.load() # load before query - term_expr = f'{ct.default_int64_field_name} > 0' + term_expr = f'{self.int64_field_name} > 0' res, _ = self.c_wrap.query(term_expr, output_fields=[ - ct.default_int64_field_name]) - self.ids = [r[ct.default_int64_field_name] for r in res] + self.int64_field_name]) + self.ids = [r[self.int64_field_name] for r in res] self.expr = None @trace() @@ -523,7 +556,7 @@ class DeleteChecker(Checker): @exception_handler() def run_task(self): delete_ids = self.ids.pop() - self.expr = f'{ct.default_int64_field_name} in {[delete_ids]}' + self.expr = f'{self.int64_field_name} in {[delete_ids]}' res, result = self.delete() return res, result @@ -536,12 +569,12 @@ class DeleteChecker(Checker): class CompactChecker(Checker): """check compact operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("CompactChecker_") - super().__init__(collection_name=collection_name) + super().__init__(collection_name=collection_name, schema=schema) self.ut = ApiUtilityWrapper() - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str( 'index_'), @@ -571,10 +604,10 @@ class CompactChecker(Checker): class DropChecker(Checker): """check drop operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("DropChecker_") - super().__init__(collection_name=collection_name) + super().__init__(collection_name=collection_name, schema=schema) @trace() def drop(self): @@ -600,12 +633,12 @@ class DropChecker(Checker): class LoadBalanceChecker(Checker): """check loadbalance operations in a dependent thread""" - def __init__(self, collection_name=None): + def __init__(self, collection_name=None, schema=None): if collection_name is None: collection_name = cf.gen_unique_str("LoadBalanceChecker_") - super().__init__(collection_name=collection_name) + super().__init__(collection_name=collection_name, schema=schema) self.utility_wrap = ApiUtilityWrapper() - res, result = self.c_wrap.create_index(ct.default_float_vec_field_name, + res, result = self.c_wrap.create_index(self.float_vector_field_name, constants.DEFAULT_INDEX_PARAM, index_name=cf.gen_unique_str( 'index_'), diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index 2e09bf423..d41950076 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -55,4 +55,4 @@ def request_duration(request): @pytest.fixture def is_check(request): - return request.config.getoption("--is_check") \ No newline at end of file + return request.config.getoption("--is_check") diff --git a/tests/python_client/chaos/constants.py b/tests/python_client/chaos/constants.py index 11322d08b..951d8a322 100644 --- a/tests/python_client/chaos/constants.py +++ b/tests/python_client/chaos/constants.py @@ -8,14 +8,14 @@ DEFAULT_DEPLOY_MODE = "single" CHAOS_NAMESPACE = "chaos-testing" # namespace of chaos CHAOS_API_VERSION = 'chaos-mesh.org/v1alpha1' # chaos mesh api version -CHAOS_GROUP = 'chaos-mesh.org' # chao mesh group -CHAOS_VERSION = 'v1alpha1' # chao mesh version +CHAOS_GROUP = 'chaos-mesh.org' # chaos mesh group +CHAOS_VERSION = 'v1alpha1' # chaos mesh version SUCC = 'succ' FAIL = 'fail' DELTA_PER_INS = 10 # entities per insert ENTITIES_FOR_SEARCH = 3000 # entities for search_collection - -CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path +ENTITIES_FOR_BULKINSERT = 1000000 # entities for bulk insert +CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chaos path TESTS_CONFIG_LOCATION = 'chaos_objects/pod_kill/' ALL_CHAOS_YAMLS = 'chaos_allstandalone_pod_kill.yaml' RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26' diff --git a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py index c08918eb5..951c60d2d 100644 --- a/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py +++ b/tests/python_client/chaos/testcases/test_all_collections_after_chaos.py @@ -34,6 +34,12 @@ class TestAllCollection(TestcaseBase): collection_w = self.init_collection_wrap(name=name, schema=schema) tt = time.time() - t0 assert collection_w.name == name + # get collection info + schema = collection_w.schema + dim = cf.get_dim_by_schema(schema=schema) + int64_field_name = cf.get_int64_field_name(schema=schema) + float_vector_field_name = cf.get_float_vec_field_name(schema=schema) + # compact collection before getting num_entities collection_w.flush(timeout=180) collection_w.compact() @@ -48,7 +54,7 @@ class TestAllCollection(TestcaseBase): for field in collection_w.schema.fields: if field.dtype.name == "JSON": with_json = True - data = cf.gen_default_list_data(start=offset, with_json=with_json) + data = cf.get_column_data_by_schema(nb=ct.default_nb, schema=schema, start=offset) t0 = time.time() _, res = collection_w.insert(data) tt = time.time() - t0 @@ -68,9 +74,9 @@ class TestAllCollection(TestcaseBase): index_infos = [index.to_dict() for index in collection_w.indexes] index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}} if len(index_infos) == 0: - log.info("collection {name} does not have index, create index for it") + log.info(f"collection {name} does not have index, create index for it") t0 = time.time() - index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name, + index, _ = collection_w.create_index(field_name=float_vector_field_name, index_params=index_params, index_name=cf.gen_unique_str()) tt = time.time() - t0 @@ -84,17 +90,17 @@ class TestAllCollection(TestcaseBase): collection_w.load() # search - search_vectors = cf.gen_vectors(1, ct.default_dim) + search_vectors = cf.gen_vectors(1, dim) search_params = {"metric_type": "L2", "params": {"ef": 64}} t0 = time.time() res_1, _ = collection_w.search(data=search_vectors, - anns_field=ct.default_float_vec_field_name, + anns_field=float_vector_field_name, param=search_params, limit=1) tt = time.time() - t0 log.info(f"assert search: {tt}") assert len(res_1) == 1 # query - term_expr = f'{ct.default_int64_field_name} in {[i for i in range(offset, 0)]}' + term_expr = f'{int64_field_name} in {[i for i in range(offset, 0)]}' t0 = time.time() res, _ = collection_w.query(term_expr) tt = time.time() - t0 @@ -103,7 +109,7 @@ class TestAllCollection(TestcaseBase): collection_w.release() # insert data - d = cf.gen_default_list_data(with_json=with_json) + d = cf.get_column_data_by_schema(nb=ct.default_nb, schema=schema) collection_w.insert(d) # load @@ -115,10 +121,10 @@ class TestAllCollection(TestcaseBase): # search nq = 5 topk = 5 - search_vectors = cf.gen_vectors(nq, ct.default_dim) + search_vectors = cf.gen_vectors(nq, dim) t0 = time.time() res, _ = collection_w.search(data=search_vectors, - anns_field=ct.default_float_vec_field_name, + anns_field=float_vector_field_name, param=search_params, limit=topk) tt = time.time() - t0 log.info(f"assert search: {tt}") @@ -126,7 +132,7 @@ class TestAllCollection(TestcaseBase): assert len(res[0]) <= topk # query - term_expr = f'{ct.default_int64_field_name} in [1, 2, 3, 4]' + term_expr = f'{int64_field_name} in [1, 2, 3, 4]' t0 = time.time() res, _ = collection_w.query(term_expr) tt = time.time() - t0 diff --git a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py index 6927c265a..6b0a0fcc1 100644 --- a/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py +++ b/tests/python_client/chaos/testcases/test_single_request_operation_for_rolling_update.py @@ -13,6 +13,7 @@ from chaos.checker import (CreateChecker, from utils.util_log import test_log as log from chaos import chaos_commons as cc from common.common_type import CaseLabel +from common import common_func as cf from chaos.chaos_commons import assert_statistic from chaos import constants from delayed_assert import assert_expectations @@ -34,7 +35,7 @@ class TestBase: class TestOperations(TestBase): @pytest.fixture(scope="function", autouse=True) - def connection(self, host, port, user, password): + def connection(self, host, port, user, password, minio_host): if user and password: # log.info(f"connect to {host}:{port} with user {user} and password {password}") connections.connect('default', host=host, port=port, user=user, password=password, secure=True) @@ -47,18 +48,21 @@ class TestOperations(TestBase): self.port = port self.user = user self.password = password + self.minio_endpoint = f"{minio_host}:9000" def init_health_checkers(self, collection_name=None): c_name = collection_name + schema = cf.gen_default_collection_schema(auto_id=False) + checkers = { - Op.create: CreateChecker(collection_name=c_name), - Op.insert: InsertChecker(collection_name=c_name), - Op.flush: FlushChecker(collection_name=c_name), - Op.index: IndexChecker(collection_name=c_name), - Op.search: SearchChecker(collection_name=c_name), - Op.query: QueryChecker(collection_name=c_name), - Op.delete: DeleteChecker(collection_name=c_name), - Op.drop: DropChecker(collection_name=c_name) + Op.create: CreateChecker(collection_name=None, schema=schema), + Op.insert: InsertChecker(collection_name=c_name, schema=schema), + Op.flush: FlushChecker(collection_name=c_name, schema=schema), + Op.index: IndexChecker(collection_name=None, schema=schema), + Op.search: SearchChecker(collection_name=c_name, schema=schema), + Op.query: QueryChecker(collection_name=c_name, schema=schema), + Op.delete: DeleteChecker(collection_name=c_name, schema=schema), + Op.drop: DropChecker(collection_name=None, schema=schema) } self.health_checkers = checkers @@ -69,7 +73,14 @@ class TestOperations(TestBase): log.info(connections.get_connection_addr('default')) c_name = None self.init_health_checkers(collection_name=c_name) + # prepare data by bulk insert + log.info("*********************Prepare Data by bulk insert**********************") + cc.start_monitor_threads(self.health_checkers) + for k, v in self.health_checkers.items(): + log.info(f"prepare bulk insert data for {k}") + v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint) + v.do_bulk_insert() log.info("*********************Load Start**********************") # wait request_duration request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") @@ -93,7 +104,7 @@ class TestOperations(TestBase): for k, v in self.health_checkers.items(): log.info(f"{k} rto: {v.get_rto()}") rto = v.get_rto() - pytest.assume(rto < 30, f"{k} rto expect 30s but get {rto}s") # rto should be less than 30s + pytest.assume(rto < 30, f"{k} rto expect 30s but get {rto}s") # rto should be less than 30s if Op.insert in self.health_checkers: # verify the no insert data loss diff --git a/tests/python_client/chaos/testcases/test_verify_all_collections.py b/tests/python_client/chaos/testcases/test_verify_all_collections.py index d102d6cc2..3d1315f2f 100644 --- a/tests/python_client/chaos/testcases/test_verify_all_collections.py +++ b/tests/python_client/chaos/testcases/test_verify_all_collections.py @@ -82,7 +82,7 @@ class TestOperations(TestBase): self.health_checkers = checkers @pytest.mark.tags(CaseLabel.L3) - def test_operations(self,collection_name, request_duration): + def test_operations(self, collection_name, request_duration): # start the monitor threads to check the milvus ops log.info("*********************Test Start**********************") log.info(connections.get_connection_addr('default')) @@ -91,13 +91,13 @@ class TestOperations(TestBase): cc.start_monitor_threads(self.health_checkers) log.info("*********************Request Load Start**********************") # wait request_duration for the load request to be finished - request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","") + request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "") if request_duration[-1] == "+": request_duration = request_duration[:-1] request_duration = eval(request_duration) for i in range(10): sleep(request_duration//10) - for k,v in self.health_checkers.items(): + for k, v in self.health_checkers.items(): v.check_result() log.info("******assert after chaos deleted: ") assert_statistic(self.health_checkers) diff --git a/tests/python_client/common/common_func.py b/tests/python_client/common/common_func.py index c89133ff8..1d90029da 100644 --- a/tests/python_client/common/common_func.py +++ b/tests/python_client/common/common_func.py @@ -3,17 +3,22 @@ import random import math import string import json +import time +import uuid from functools import singledispatch import numpy as np import pandas as pd from sklearn import preprocessing from npy_append_array import NpyAppendArray +from faker import Faker +from pathlib import Path +from minio import Minio from pymilvus import DataType from base.schema_wrapper import ApiCollectionSchemaWrapper, ApiFieldSchemaWrapper from common import common_type as ct from utils.util_log import test_log as log from customize.milvus_operator import MilvusOperator - +fake = Faker() """" Methods of processing data """ @@ -492,14 +497,65 @@ def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, varchar_len=2000, wi int_values = [i for i in range(nb)] float_values = [np.float32(i) for i in range(nb)] string_values = [f"{str(i)}_{str_value}" for i in range(nb)] + # in case of large nb, float_vec_values will be too large in memory + # then generate float_vec_values in each loop instead of generating all at once during generate npy or json file float_vec_values = [] # placeholder for float_vec data = [int_values, float_values, string_values, float_vec_values] if with_varchar_field is False: data = [int_values, float_values, float_vec_values] return data - -def get_list_data_by_schema(nb=ct.default_nb, schema=None): + +def prepare_bulk_insert_data(schema=None, + nb=ct.default_nb, + file_type="npy", + minio_endpoint="127.0.0.1:9000", + bucket_name="milvus-bucket"): + schema = gen_default_collection_schema() if schema is None else schema + dim = get_dim_by_schema(schema=schema) + log.info(f"start to generate raw data for bulk insert") + t0 = time.time() + data = get_column_data_by_schema(schema=schema, nb=nb, skip_vectors=True) + log.info(f"generate raw data for bulk insert cost {time.time() - t0} s") + data_dir = "/tmp/bulk_insert_data" + Path(data_dir).mkdir(parents=True, exist_ok=True) + log.info(f"schema:{schema}, nb:{nb}, file_type:{file_type}, minio_endpoint:{minio_endpoint}, bucket_name:{bucket_name}") + files = [] + log.info(f"generate {file_type} files for bulk insert") + if file_type == "json": + files = gen_json_files_for_bulk_insert(data, schema, data_dir) + if file_type == "npy": + files = gen_npy_files_for_bulk_insert(data, schema, data_dir) + log.info(f"generated {len(files)} {file_type} files for bulk insert, cost {time.time() - t0} s") + log.info("upload file to minio") + client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False) + for file_name in files: + file_size = os.path.getsize(os.path.join(data_dir, file_name)) / 1024 / 1024 + t0 = time.time() + client.fput_object(bucket_name, file_name, os.path.join(data_dir, file_name)) + log.info(f"upload file {file_name} to minio, size: {file_size:.2f} MB, cost {time.time() - t0:.2f} s") + return files + + +def get_column_data_by_schema(nb=ct.default_nb, schema=None, skip_vectors=False, start=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + fields_not_auto_id = [] + for field in fields: + if not field.auto_id: + fields_not_auto_id.append(field) + data = [] + for field in fields_not_auto_id: + if field.dtype == DataType.FLOAT_VECTOR and skip_vectors is True: + tmp = [] + else: + tmp = gen_data_by_type(field, nb=nb, start=start) + data.append(tmp) + return data + + +def get_row_data_by_schema(nb=ct.default_nb, schema=None): if schema is None: schema = gen_default_collection_schema() fields = schema.fields @@ -515,48 +571,149 @@ def get_list_data_by_schema(nb=ct.default_nb, schema=None): data.append(tmp) return data -def gen_data_by_type(field): + +def get_fields_map(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + fields_map = {} + for field in fields: + fields_map[field.name] = field.dtype + return fields_map + + +def get_int64_field_name(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + for field in fields: + if field.dtype == DataType.INT64: + return field.name + return None + + +def get_float_field_name(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + for field in fields: + if field.dtype == DataType.FLOAT or field.dtype == DataType.DOUBLE: + return field.name + return None + + +def get_float_vec_field_name(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + for field in fields: + if field.dtype == DataType.FLOAT_VECTOR: + return field.name + return None + + +def get_binary_vec_field_name(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + for field in fields: + if field.dtype == DataType.BINARY_VECTOR: + return field.name + return None + + +def get_dim_by_schema(schema=None): + if schema is None: + schema = gen_default_collection_schema() + fields = schema.fields + for field in fields: + if field.dtype == DataType.FLOAT_VECTOR or field.dtype == DataType.BINARY_VECTOR: + dim = field.params['dim'] + return dim + return None + + +def gen_data_by_type(field, nb=None, start=None): + # if nb is None, return one data, else return a list of data data_type = field.dtype if data_type == DataType.BOOL: - return random.choice([True, False]) + if nb is None: + return random.choice([True, False]) + return [random.choice([True, False]) for _ in range(nb)] if data_type == DataType.INT8: - return random.randint(-128, 127) + if nb is None: + return random.randint(-128, 127) + return [random.randint(-128, 127) for _ in range(nb)] if data_type == DataType.INT16: - return random.randint(-32768, 32767) + if nb is None: + return random.randint(-32768, 32767) + return [random.randint(-32768, 32767) for _ in range(nb)] if data_type == DataType.INT32: - return random.randint(-2147483648, 2147483647) + if nb is None: + return random.randint(-2147483648, 2147483647) + return [random.randint(-2147483648, 2147483647) for _ in range(nb)] if data_type == DataType.INT64: - return random.randint(-9223372036854775808, 9223372036854775807) + if nb is None: + return random.randint(-9223372036854775808, 9223372036854775807) + if start is not None: + return [i for i in range(start, start+nb)] + return [random.randint(-9223372036854775808, 9223372036854775807) for _ in range(nb)] if data_type == DataType.FLOAT: - return np.float64(random.random()) # Object of type float32 is not JSON serializable, so set it as float64 + if nb is None: + return np.float32(random.random()) + return [np.float32(random.random()) for _ in range(nb)] if data_type == DataType.DOUBLE: - return np.float64(random.random()) + if nb is None: + return np.float64(random.random()) + return [np.float64(random.random()) for _ in range(nb)] if data_type == DataType.VARCHAR: max_length = field.params['max_length'] + max_length = min(20, max_length-1) length = random.randint(0, max_length) - return "".join([chr(random.randint(97, 122)) for _ in range(length)]) + if nb is None: + return "".join([chr(random.randint(97, 122)) for _ in range(length)]) + return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)] + if data_type == DataType.JSON: + if nb is None: + return {"name": fake.name(), "address": fake.address()} + data = [{"name": str(i), "address": i} for i in range(nb)] + return data if data_type == DataType.FLOAT_VECTOR: dim = field.params['dim'] - return preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist() + if nb is None: + return [random.random() for i in range(dim)] + return [[random.random() for i in range(dim)] for _ in range(nb)] return None -def gen_json_files_for_bulk_insert(data, schema, data_dir, **kwargs): - nb = kwargs.get("nb", ct.default_nb) - dim = kwargs.get("dim", ct.default_dim) +def gen_json_files_for_bulk_insert(data, schema, data_dir): + for d in data: + if len(d) > 0: + nb = len(d) + dim = get_dim_by_schema(schema) + vec_field_name = get_float_vec_field_name(schema) fields_name = [field.name for field in schema.fields] - file_name = f"bulk_insert_data_source_dim_{dim}_nb_{nb}.json" + # get vec field index + vec_field_index = fields_name.index(vec_field_name) + uuid_str = str(uuid.uuid4()) + log.info(f"file dir name: {uuid_str}") + file_name = f"{uuid_str}/bulk_insert_data_source_dim_{dim}_nb_{nb}.json" files = [file_name] data_source = os.path.join(data_dir, file_name) + Path(data_source).parent.mkdir(parents=True, exist_ok=True) + log.info(f"file name: {data_source}") with open(data_source, "w") as f: f.write("{") f.write("\n") f.write('"rows":[') f.write("\n") for i in range(nb): - entity_value = [field_values[i] for field_values in data[:-1]] - vector = [random.random() for _ in range(dim)] - entity_value.append(vector) + entity_value = [None for _ in range(len(fields_name))] + for j in range(len(data)): + if j == vec_field_index: + entity_value[j] = [random.random() for _ in range(dim)] + else: + entity_value[j] = data[j][i] entity = dict(zip(fields_name, entity_value)) f.write(json.dumps(entity, indent=4, default=to_serializable)) if i != nb - 1: @@ -568,22 +725,35 @@ def gen_json_files_for_bulk_insert(data, schema, data_dir, **kwargs): return files -def gen_npy_files_for_bulk_insert(data, schema, data_dir, **kwargs): - nb = kwargs.get("nb", ct.default_nb) - dim = kwargs.get("dim", ct.default_dim) +def gen_npy_files_for_bulk_insert(data, schema, data_dir): + for d in data: + if len(d) > 0: + nb = len(d) + dim = get_dim_by_schema(schema) + vec_filed_name = get_float_vec_field_name(schema) fields_name = [field.name for field in schema.fields] files = [] + uuid_str = uuid.uuid4() for field in fields_name: - files.append(f"{field}.npy") + files.append(f"{uuid_str}/{field}.npy") for i, file in enumerate(files): data_source = os.path.join(data_dir, file) - if "vector" in file: + # mkdir for npy file + Path(data_source).parent.mkdir(parents=True, exist_ok=True) + log.info(f"save file {data_source}") + if vec_filed_name in file: log.info(f"generate {nb} vectors with dim {dim} for {data_source}") with NpyAppendArray(data_source, "wb") as npaa: for j in range(nb): vector = np.array([[random.random() for _ in range(dim)]]) npaa.append(vector) + elif isinstance(data[i][0], dict): + tmp = [] + for d in data[i]: + tmp.append(json.dumps(d)) + data[i] = tmp + np.save(data_source, np.array(data[i])) else: np.save(data_source, np.array(data[i])) return files diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 4fb3e38a3..c14206f0f 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -33,6 +33,7 @@ timeout-decorator==0.5.0 # for bulk insert test minio==7.1.5 npy-append-array==0.9.15 +Faker==19.2.0 # for benchmark h5py==3.8.0 diff --git a/tests/python_client/testcases/test_bulk_insert.py b/tests/python_client/testcases/test_bulk_insert.py index b3f582b8c..ba3155157 100644 --- a/tests/python_client/testcases/test_bulk_insert.py +++ b/tests/python_client/testcases/test_bulk_insert.py @@ -742,4 +742,3 @@ class TestBulkInsert(TestcaseBaseBulkInsert): check_task=CheckTasks.check_search_results, check_items={"nq": 1, "limit": 1}, ) - -- GitLab