diff --git a/tests/python_client/deploy/scripts/action_after_reinstall.py b/tests/python_client/deploy/scripts/action_after_reinstall.py index 8a040e3cb9cedd796cb434581ec14551b22fb07b..cec0876991a87eb5907bf45e4cc22c90b3997266 100644 --- a/tests/python_client/deploy/scripts/action_after_reinstall.py +++ b/tests/python_client/deploy/scripts/action_after_reinstall.py @@ -35,12 +35,18 @@ def task_2(data_zise, host): if __name__ == '__main__': import argparse + import threading parser = argparse.ArgumentParser(description='config for deploy test') parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip') parser.add_argument('--data_size', type=int, default=3000, help='data size') args = parser.parse_args() host = args.host data_size = args.data_size - print(f"data size: {data_size}") - task_1(data_size, host) - task_2(data_size, host) \ No newline at end of file + logger.info(f"data size: {data_size}") + tasks = [] + tasks.append(threading.Thread(target=task_1, args=(data_size, host))) + tasks.append(threading.Thread(target=task_2, args=(data_size, host))) + for task in tasks: + task.start() + for task in tasks: + task.join() \ No newline at end of file diff --git a/tests/python_client/deploy/scripts/action_after_upgrade.py b/tests/python_client/deploy/scripts/action_after_upgrade.py index c08812bba26737c187e72ad3d18581cdce545818..32521cb4e7e755448625f2d380e665328895fe36 100644 --- a/tests/python_client/deploy/scripts/action_after_upgrade.py +++ b/tests/python_client/deploy/scripts/action_after_upgrade.py @@ -91,22 +91,28 @@ def task_5(data_size, host): if __name__ == '__main__': import argparse + import threading parser = argparse.ArgumentParser(description='config for deploy test') parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip') parser.add_argument('--data_size', type=int, default=3000, help='data size') args = parser.parse_args() data_size = args.data_size host = args.host - print(f"data size: {data_size}") + logger.info(f"data size: {data_size}") connections.connect(host=host, port=19530, timeout=60) ms = MilvusSys() # create index for flat - print("create index for flat start") + logger.info("create index for flat start") create_index_flat() - print("create index for flat done") - task_1(data_size, host) - task_2(data_size, host) + logger.info("create index for flat done") + tasks = [] + tasks.append(threading.Thread(target=task_1, args=(data_size, host))) + tasks.append(threading.Thread(target=task_2, args=(data_size, host))) if len(ms.query_nodes) >= NUM_REPLICAS: - task_3(data_size, host) - task_4(data_size, host) - task_5(data_size, host) + tasks.append(threading.Thread(target=task_3, args=(data_size, host))) + tasks.append(threading.Thread(target=task_4, args=(data_size, host))) + tasks.append(threading.Thread(target=task_5, args=(data_size, host))) + for task in tasks: + task.start() + for task in tasks: + task.join() diff --git a/tests/python_client/deploy/scripts/action_before_reinstall.py b/tests/python_client/deploy/scripts/action_before_reinstall.py index 2a61942933be8637721beed997a8b94f1fa879e0..712664b2d47f1468fe0f6a32ddcabd27ff2bd951 100644 --- a/tests/python_client/deploy/scripts/action_before_reinstall.py +++ b/tests/python_client/deploy/scripts/action_before_reinstall.py @@ -36,12 +36,18 @@ def task_2(data_size, host): if __name__ == '__main__': import argparse + import threading parser = argparse.ArgumentParser(description='config for deploy test') parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip') parser.add_argument('--data_size', type=int, default=3000, help='data size') args = parser.parse_args() data_size = args.data_size host = args.host - print(f"data_size: {data_size}") - task_1(data_size, host) - task_2(data_size, host) + logger.info(f"data_size: {data_size}") + tasks = [] + tasks.append(threading.Thread(target=task_1, args=(data_size, host))) + tasks.append(threading.Thread(target=task_2, args=(data_size, host))) + for task in tasks: + task.start() + for task in tasks: + task.join() \ No newline at end of file diff --git a/tests/python_client/deploy/scripts/action_before_upgrade.py b/tests/python_client/deploy/scripts/action_before_upgrade.py index 3d98d06a3d9caa2016fda0a5000df60b9939582c..da0baa48956cfb33abea7d69c6dc70883e0bdc30 100644 --- a/tests/python_client/deploy/scripts/action_before_upgrade.py +++ b/tests/python_client/deploy/scripts/action_before_upgrade.py @@ -73,18 +73,25 @@ def task_5(data_size, host): if __name__ == '__main__': import argparse + import threading parser = argparse.ArgumentParser(description='config for deploy test') parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip') parser.add_argument('--data_size', type=int, default=3000, help='data size') args = parser.parse_args() data_size = args.data_size host = args.host - print(f"data size: {data_size}") + logger.info(f"data size: {data_size}") connections.connect(host=host, port=19530, timeout=60) ms = MilvusSys() - task_1(data_size, host) - task_2(data_size, host) + tasks = [] + tasks.append(threading.Thread(target=task_1, args=(data_size, host))) + tasks.append(threading.Thread(target=task_2, args=(data_size, host))) if len(ms.query_nodes) >= NUM_REPLICAS: - task_3(data_size, host) - task_4(data_size, host) - task_5(data_size, host) \ No newline at end of file + tasks.append(threading.Thread(target=task_3, args=(data_size, host))) + tasks.append(threading.Thread(target=task_4, args=(data_size, host))) + tasks.append(threading.Thread(target=task_5, args=(data_size, host))) + for task in tasks: + task.start() + for task in tasks: + task.join() + \ No newline at end of file diff --git a/tests/python_client/deploy/scripts/utils.py b/tests/python_client/deploy/scripts/utils.py index a2f4d0d3c1803f3697d7dda8c368f645634917a7..1370a93739a5c3bb838f8fd568de8611cc450419 100644 --- a/tests/python_client/deploy/scripts/utils.py +++ b/tests/python_client/deploy/scripts/utils.py @@ -1,10 +1,18 @@ +import sys import copy import time +from loguru import logger import pymilvus from pymilvus import ( FieldSchema, CollectionSchema, DataType, Collection, list_collections, ) +logger.remove() +logger.add(sys.stderr, format= "{time:YYYY-MM-DD HH:mm:ss.SSS} | " + "{level: <8} | " + "{thread.name} |" + "{name}:{function}:{line} - {message}", + level="INFO") pymilvus_version = pymilvus.__version__ @@ -46,15 +54,15 @@ def gen_search_param(index_type, metric_type="L2"): annoy_search_param = {"metric_type": metric_type, "params": {"search_k": search_k}} search_params.append(annoy_search_param) else: - print("Invalid index_type.") + logger.info("Invalid index_type.") raise Exception("Invalid index_type.") return search_params def get_collections(prefix, check=False): - print("\nList collections...") + logger.info("\nList collections...") col_list = filter_collections_by_prefix(prefix) - print(f"collections_nums: {len(col_list)}") + logger.info(f"collections_nums: {len(col_list)}") # list entities if collections for name in col_list: c = Collection(name=name) @@ -63,7 +71,7 @@ def get_collections(prefix, check=False): else: c.num_entities num_entities = c.num_entities - print(f"{name}: {num_entities}") + logger.info(f"{name}: {num_entities}") if check: assert num_entities >= 3000 return col_list @@ -80,11 +88,11 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio ] default_schema = CollectionSchema(fields=default_fields, description="test collection") for index_name in all_index_types[:collection_cnt]: - print("\nCreate collection...") + logger.info("\nCreate collection...") col_name = prefix + index_name collection = Collection(name=col_name, schema=default_schema) - print(f"collection name: {col_name}") - print(f"begin insert, count: {count} nb: {nb}") + logger.info(f"collection name: {col_name}") + logger.info(f"begin insert, count: {count} nb: {nb}") times = int(count // nb) total_time = 0.0 vectors = [[random.random() for _ in range(dim)] for _ in range(count)] @@ -98,22 +106,22 @@ def create_collections_and_insert_data(prefix, flush=True, count=3000, collectio ] ) end_time = time.time() - print(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}") + logger.info(f"[{j+1}/{times}] insert {nb} data, time: {end_time - start_time:.4f}") total_time += end_time - start_time - print(f"end insert, time: {total_time:.4f}") + logger.info(f"end insert, time: {total_time:.4f}") if flush: - print("Get collection entities") + logger.info("Get collection entities") start_time = time.time() if pymilvus_version >= "2.2.0": collection.flush() else: collection.num_entities - print(f"collection entities: {collection.num_entities}") + logger.info(f"collection entities: {collection.num_entities}") end_time = time.time() - print("Get collection entities time = %.4fs" % (end_time - start_time)) - print("\nList collections...") - print(get_collections(prefix)) + logger.info("Get collection entities time = %.4fs" % (end_time - start_time)) + logger.info("\nList collections...") + logger.info(get_collections(prefix)) def create_index_flat(): @@ -124,25 +132,41 @@ def create_index_flat(): for col_name in all_col_list: if "FLAT" in col_name and "task" in col_name and "IVF" not in col_name: col_list.append(col_name) - print("\nCreate index for FLAT...") + logger.info("\nCreate index for FLAT...") for col_name in col_list: c = Collection(name=col_name) - print(c) + logger.info(c) + try: + replicas = c.get_replicas() + replica_number = len(replicas.groups) + c.release() + except Exception as e: + replica_number = 0 + logger.info(e) t0 = time.time() c.create_index(field_name="float_vector", index_params=default_flat_index) - print(f"create index time: {time.time() - t0:.4f}") + logger.info(f"create index time: {time.time() - t0:.4f}") + if replica_number > 0: + c.load(replica_number=replica_number) def create_index(prefix): # create index default_index = {"index_type": "IVF_FLAT", "params": {"nlist": 128}, "metric_type": "L2"} col_list = get_collections(prefix) - print("\nCreate index...") + logger.info("\nCreate index...") for col_name in col_list: c = Collection(name=col_name) + try: + replicas = c.get_replicas() + replica_number = len(replicas.groups) + c.release() + except Exception as e: + replica_number = 0 + logger.info(e) index_name = col_name.replace(prefix, "") - print(index_name) - print(c) + logger.info(index_name) + logger.info(c) index = copy.deepcopy(default_index) index["index_type"] = index_name index["params"] = index_params_map[index_name] @@ -150,45 +174,47 @@ def create_index(prefix): index["metric_type"] = "HAMMING" t0 = time.time() c.create_index(field_name="float_vector", index_params=index) - print(f"create index time: {time.time() - t0:.4f}") + logger.info(f"create index time: {time.time() - t0:.4f}") + if replica_number > 0: + c.load(replica_number=replica_number) def release_collection(prefix): col_list = get_collections(prefix) - print("release collection") + logger.info("release collection") for col_name in col_list: c = Collection(name=col_name) c.release() def load_and_search(prefix, replicas=1): - print("search data starts") + logger.info("search data starts") col_list = get_collections(prefix) for col_name in col_list: c = Collection(name=col_name) - print(f"collection name: {col_name}") - print("load collection") + logger.info(f"collection name: {col_name}") + logger.info("load collection") if replicas == 1: t0 = time.time() c.load() - print(f"load time: {time.time() - t0:.4f}") + logger.info(f"load time: {time.time() - t0:.4f}") if replicas > 1: - print("release collection before load if replicas > 1") + logger.info("release collection before load if replicas > 1") t0 = time.time() c.release() - print(f"release time: {time.time() - t0:.4f}") + logger.info(f"release time: {time.time() - t0:.4f}") t0 = time.time() c.load(replica_number=replicas) - print(f"load time: {time.time() - t0:.4f}") - print(c.get_replicas()) + logger.info(f"load time: {time.time() - t0:.4f}") + logger.info(c.get_replicas()) topK = 5 vectors = [[1.0 for _ in range(128)] for _ in range(3000)] index_name = col_name.replace(prefix, "") search_params = gen_search_param(index_name)[0] - print(search_params) + logger.info(search_params) # search_params = {"metric_type": "L2", "params": {"nprobe": 10}} start_time = time.time() - print(f"\nSearch...") + logger.info(f"\nSearch...") # define output_fields of search result v_search = vectors[:1] res = c.search( @@ -200,22 +226,22 @@ def load_and_search(prefix, replicas=1): for hits in res: for hit in hits: # Get value of the random value field for search result - print(hit, hit.entity.get("random_value")) + logger.info(str(hits), hit.entity.get("random_value")) ids = hits.ids assert len(ids) == topK, f"get {len(ids)} results, but topK is {topK}" - print(ids) + logger.info(ids) assert len(res) == len(v_search), f"get {len(res)} results, but search num is {len(v_search)}" - print("search latency: %.4fs" % (end_time - start_time)) + logger.info("search latency: %.4fs" % (end_time - start_time)) t0 = time.time() expr = "count in [2,4,6,8]" output_fields = ["count", "random_value"] - res = c.query(expr, output_fields, timeout=20) + res = c.query(expr, output_fields, timeout=120) sorted_res = sorted(res, key=lambda k: k['count']) for r in sorted_res: - print(r) + logger.info(r) t1 = time.time() assert len(res) == 4 - print("query latency: %.4fs" % (t1 - t0)) + logger.info("query latency: %.4fs" % (t1 - t0)) # c.release() - print("###########") - print("search data ends") + logger.info("###########") + logger.info("search data ends") diff --git a/tests/python_client/deploy/testcases/test_action_first_deployment.py b/tests/python_client/deploy/testcases/test_action_first_deployment.py index 3150b90cc3afe041881bbe6e3520d60b4e35f004..96deef4cff59f9f1e0fe5cba4a5dd96d24780b2e 100644 --- a/tests/python_client/deploy/testcases/test_action_first_deployment.py +++ b/tests/python_client/deploy/testcases/test_action_first_deployment.py @@ -21,13 +21,13 @@ default_int64_field_name = ct.default_int64_field_name default_float_field_name = ct.default_float_field_name default_bool_field_name = ct.default_bool_field_name default_string_field_name = ct.default_string_field_name -binary_field_name = default_binary_vec_field_name +binary_field_name = ct.default_binary_vec_field_name default_search_exp = "int64 >= 0" default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' prefix = "deploy_test" -TIMEOUT = 60 +TIMEOUT = 120 class TestActionFirstDeployment(TestDeployBase): """ Test case of action before reinstall """ @@ -39,13 +39,14 @@ class TestActionFirstDeployment(TestDeployBase): log.info("skip drop collection") @pytest.mark.tags(CaseLabel.L3) - @pytest.mark.parametrize("index_type", ["HNSW","BIN_IVF_FLAT"]) - def test_task_all_empty(self,index_type): + @pytest.mark.parametrize("replica_number", [0]) + @pytest.mark.parametrize("index_type", ["HNSW", "BIN_IVF_FLAT"]) + def test_task_all_empty(self, index_type, replica_number): """ before reinstall: create collection """ name = "" - for k,v in locals().items(): + for k, v in locals().items(): if k in ["self", "name"]: continue name += f"_{k}_{v}" @@ -59,9 +60,6 @@ class TestActionFirstDeployment(TestDeployBase): for index_name in index_names: collection_w.drop_index(index_name=index_name) - - - @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("replica_number", [0, 1, 2]) @pytest.mark.parametrize("is_compacted", ["is_compacted", "not_compacted"]) @@ -131,6 +129,7 @@ class TestActionFirstDeployment(TestDeployBase): log.error( f"release collection failed: {e} maybe the collection is not loaded") collection_w.load(replica_number=replica_number, timeout=TIMEOUT) + self.utility_wrap.wait_for_loading_complete(name) # delete data for growing segment delete_expr = f"{ct.default_int64_field_name} in {[i for i in range(0,10)]}" @@ -181,6 +180,7 @@ class TestActionFirstDeployment(TestDeployBase): if replica_number > 0: collection_w.release() collection_w.load(replica_number=replica_number, timeout=TIMEOUT) + self.utility_wrap.wait_for_loading_complete(name) # insert data to get growing segment after reload if segment_status == "all": diff --git a/tests/python_client/deploy/testcases/test_action_second_deployment.py b/tests/python_client/deploy/testcases/test_action_second_deployment.py index f1caa21d6d5a0b78b7bbfd55859ebcb72599cbd0..63288ed955e72e295c01e98e2df20bb02ff50ab5 100644 --- a/tests/python_client/deploy/testcases/test_action_second_deployment.py +++ b/tests/python_client/deploy/testcases/test_action_second_deployment.py @@ -1,4 +1,6 @@ import pytest +import re +import time import pymilvus from common import common_func as cf from common import common_type as ct @@ -19,7 +21,7 @@ default_int64_field_name = ct.default_int64_field_name default_float_field_name = ct.default_float_field_name default_bool_field_name = ct.default_bool_field_name default_string_field_name = ct.default_string_field_name -binary_field_name = default_binary_vec_field_name +binary_field_name = ct.default_binary_vec_field_name default_search_exp = "int64 >= 0" default_term_expr = f'{ct.default_int64_field_name} in [0, 1]' @@ -41,28 +43,19 @@ class TestActionSecondDeployment(TestDeployBase): method.__name__) log.info("show collection info") log.info(f"collection {self.collection_w.name} has entities: {self.collection_w.num_entities}") - try: - replicas = self.collection_w.get_replicas(enable_traceback=False) - replicas_loaded = len(replicas.groups) - except Exception as e: - log.info("get replicas failed with error {str(e)}") - replicas_loaded = 0 - log.info(f"collection {self.collection_w.name} has {replicas_loaded} replicas") + + res, _ = self.utility_wrap.get_query_segment_info(self.collection_w.name) + log.info(f"The segment info of collection {self.collection_w.name} is {res}") + index_infos = [index.to_dict() for index in self.collection_w.indexes] log.info(f"collection {self.collection_w.name} index infos {index_infos}") log.info("skip drop collection") def create_index(self, collection_w, default_index_field, default_index_param): - try: - replicas = collection_w.get_replicas(enable_traceback=False) - replicas_loaded = len(replicas.groups) - except Exception as e: - log.info("get replicas failed") - replicas_loaded = 0 - log.info(f"before create index, collection {collection_w.name} has {replicas_loaded} replicas") + index_field_map = dict([(index.field_name, index.index_name) for index in collection_w.indexes]) index_infos = [index.to_dict() for index in collection_w.indexes] - log.info(index_infos) + log.info(f"index info: {index_infos}") # log.info(f"{default_index_field:} {default_index_param:}") if len(index_infos) > 0: log.info( @@ -107,14 +100,23 @@ class TestActionSecondDeployment(TestDeployBase): vector_index_types = binary_vector_index_types + float_vector_index_types if len(vector_index_types) > 0: vector_index_type = vector_index_types[0] + try: + t0 = time.time() + self.utility_wrap.wait_for_loading_complete(name) + log.info(f"wait for {name} loading complete cost {time.time() - t0}") + except Exception as e: + log.error(e) # get replicas loaded try: replicas = collection_w.get_replicas(enable_traceback=False) replicas_loaded = len(replicas.groups) except Exception as e: - log.info(f"get replicas failed with error {str(e)}") + log.error(e) replicas_loaded = 0 + log.info(f"collection {name} has {replicas_loaded} replicas") + actual_replicas = re.search(r'replica_number_(.*?)_', name).group(1) + assert replicas_loaded == int(actual_replicas) # params for search and query if is_binary: _, vectors_to_search = cf.gen_binary_vectors( diff --git a/tests/python_client/requirements.txt b/tests/python_client/requirements.txt index 3e37346e189c942bfd997dd52c9ff1bc7e86b938..996be583806ce190a64d423bce0d62a567d434bc 100644 --- a/tests/python_client/requirements.txt +++ b/tests/python_client/requirements.txt @@ -39,3 +39,6 @@ minio==7.1.5 # for benchmark h5py==3.1.0 + +# for log +loguru==0.5.3 \ No newline at end of file