From 3ac87fe0259cb47dfecd7de2131ca4b2b186638f Mon Sep 17 00:00:00 2001 From: yanliang567 <82361606+yanliang567@users.noreply.github.com> Date: Wed, 2 Nov 2022 20:35:35 +0800 Subject: [PATCH] Update bulk_insert to do_bulk_insert (#20277) Signed-off-by: yanliang567 Signed-off-by: yanliang567 --- tests/python_client/base/utility_wrapper.py | 6 +- .../bulk_insert/test_bulk_insert.py | 298 +++++++++--------- .../test_bulk_insert_task_clean.py | 15 +- 3 files changed, 162 insertions(+), 157 deletions(-) diff --git a/tests/python_client/base/utility_wrapper.py b/tests/python_client/base/utility_wrapper.py index f4af17785..e0c9665c1 100644 --- a/tests/python_client/base/utility_wrapper.py +++ b/tests/python_client/base/utility_wrapper.py @@ -18,13 +18,13 @@ class ApiUtilityWrapper: ut = utility role = None - def bulk_insert(self, collection_name, files="", partition_name=None, timeout=None, - using="default", check_task=None, check_items=None, **kwargs): + def do_bulk_insert(self, collection_name, files="", partition_name=None, timeout=None, + using="default", check_task=None, check_items=None, **kwargs): working_tasks = self.get_bulk_insert_working_list() log.info(f"before bulk load, there are {len(working_tasks)} working tasks") log.info(f"files to load: {files}") func_name = sys._getframe().f_code.co_name - res, is_succ = api_request([self.ut.bulk_insert, collection_name, + res, is_succ = api_request([self.ut.do_bulk_insert, collection_name, files, partition_name, timeout, using], **kwargs) check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ, collection_name=collection_name, using=using).run() diff --git a/tests/python_client/bulk_insert/test_bulk_insert.py b/tests/python_client/bulk_insert/test_bulk_insert.py index 40dea2263..1fafcffc0 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert.py +++ b/tests/python_client/bulk_insert/test_bulk_insert.py @@ -116,14 +116,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task id:{task_id}") success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -198,12 +198,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") completed, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{completed} in {tt}") @@ -294,14 +294,14 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data into the partition t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=p_name, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, state = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -386,11 +386,11 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert(collection_name=c_name, - files=files) - logging.info(f"bulk insert task ids:{task_ids}") + task_id, _ = self.utility_wrap.do_bulk_insert(collection_name=c_name, + files=files) + logging.info(f"bulk insert task ids:{task_id}") success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -478,12 +478,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -590,12 +590,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.num_entities # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -679,12 +679,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -781,12 +781,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -885,51 +885,54 @@ class TestBulkInsert(TestcaseBaseBulkInsert): self.collection_wrap.load() # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( - collection_name=c_name, files=files - ) - logging.info(f"bulk insert task ids:{task_ids}") - success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 - ) - tt = time.time() - t0 - log.info(f"bulk insert state:{success} in {tt}") - if not is_row_based: - assert not success - failed_reason = "is duplicated" # "the field xxx is duplicated" - for state in states.values(): - assert state.state_name in ["Failed", "Failed and cleaned"] - assert failed_reason in state.infos.get("failed_reason", "") - else: - assert success - num_entities = self.collection_wrap.num_entities - log.info(f" collection entities: {num_entities}") - assert num_entities == entities * file_nums - - # verify index built - res, _ = self.utility_wrap.index_building_progress(c_name) - exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums} - assert res == exp_res - - # verify search and query - log.info(f"wait for load finished and be ready for search") - time.sleep(5) - nq = 5 - topk = 1 - search_data = cf.gen_vectors(nq, dim) - search_params = ct.default_search_params - res, _ = self.collection_wrap.search( - search_data, - df.vec_field, - param=search_params, - limit=topk, - check_task=CheckTasks.check_search_results, - check_items={"nq": nq, "limit": topk}, - ) - for hits in res: - ids = hits.ids - results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") - assert len(results) == len(ids) + err_msg = "row-based import, only allow one JSON file each time" + task_id, _ = self.utility_wrap.do_bulk_insert( + collection_name=c_name, files=files, + check_task=CheckTasks.err_res, check_items={"err_code": 1, "err_msg": err_msg}, + ) + + # logging.info(f"bulk insert task ids:{task_id}") + # success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( + # task_ids=[task_id], timeout=90 + # ) + # tt = time.time() - t0 + # log.info(f"bulk insert state:{success} in {tt}") + # if not is_row_based: + # assert not success + # failed_reason = "is duplicated" # "the field xxx is duplicated" + # for state in states.values(): + # assert state.state_name in ["Failed", "Failed and cleaned"] + # assert failed_reason in state.infos.get("failed_reason", "") + # else: + # assert success + # num_entities = self.collection_wrap.num_entities + # log.info(f" collection entities: {num_entities}") + # assert num_entities == entities * file_nums + # + # # verify index built + # res, _ = self.utility_wrap.index_building_progress(c_name) + # exp_res = {'total_rows': entities * file_nums, 'indexed_rows': entities * file_nums} + # assert res == exp_res + # + # # verify search and query + # log.info(f"wait for load finished and be ready for search") + # time.sleep(5) + # nq = 5 + # topk = 1 + # search_data = cf.gen_vectors(nq, dim) + # search_params = ct.default_search_params + # res, _ = self.collection_wrap.search( + # search_data, + # df.vec_field, + # param=search_params, + # limit=topk, + # check_task=CheckTasks.check_search_results, + # check_items={"nq": nq, "limit": topk}, + # ) + # for hits in res: + # ids = hits.ids + # results, _ = self.collection_wrap.query(expr=f"{df.pk_field} in {ids}") + # assert len(results) == len(ids) @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("is_row_based", [True]) @@ -1016,12 +1019,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -1110,12 +1113,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert success @@ -1174,12 +1177,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert success @@ -1245,12 +1248,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -1324,12 +1327,12 @@ class TestBulkInsert(TestcaseBaseBulkInsert): file_nums=1, force=True, ) - task_id, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - task_ids.append(task_id[0]) + task_ids.append(task_id) success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") @@ -1380,14 +1383,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) assert not success failed_reason = f"failed to get file size of '{files[0]}'" @@ -1429,14 +1432,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) assert not success failed_reason = "JSON parser: row count is 0" @@ -1449,7 +1452,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("auto_id", [True, False]) @pytest.mark.parametrize("dim", [8]) # 8 @pytest.mark.parametrize("entities", [100]) # 100 - @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658") + # @pytest.mark.xfail(reason="issue https://github.com/milvus-io/milvus/issues/19658") def test_wrong_file_type(self, is_row_based, auto_id, dim, entities): """ collection schema: [pk, float_vector] @@ -1491,22 +1494,24 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") assert not success - failed_reason = "unsupported file type" + failed_reason = f"the file '{files[0]}' has no corresponding field in collection" + failed_reason2 = "unsupportted file type" for state in states.values(): assert state.state_name in ["Failed", "Failed and cleaned"] - assert failed_reason in state.infos.get("failed_reason", "") + assert failed_reason in state.infos.get("failed_reason", "") or \ + failed_reason2 in state.infos.get("failed_reason", "") @pytest.mark.tags(CaseLabel.L3) @pytest.mark.parametrize("is_row_based", [True]) @@ -1543,14 +1548,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) assert not success if is_row_based: @@ -1597,14 +1602,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -1654,14 +1659,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -1713,14 +1718,14 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name="", files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -1765,12 +1770,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -1802,7 +1807,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): c_name = cf.gen_unique_str("bulk_insert") # import data into a non existing collection err_msg = f"can't find collection: {c_name}" - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files, check_task=CheckTasks.err_res, @@ -1841,7 +1846,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data into a non existing partition p_name = "non_existing" err_msg = f" partition {p_name} does not exist" - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=p_name, files=files, @@ -1886,12 +1891,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -1909,6 +1914,7 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): @pytest.mark.parametrize("dim", [16]) @pytest.mark.parametrize("entities", [300]) @pytest.mark.parametrize("file_nums", [10]) # max task nums 32? need improve + @pytest.mark.skip(reason="not support multiple files now") def test_float_vector_one_of_files_fail( self, is_row_based, auto_id, dim, entities, file_nums ): @@ -1960,12 +1966,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -2012,12 +2018,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -2068,12 +2074,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): log.info(schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -2119,12 +2125,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -2174,12 +2180,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=False) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -2229,12 +2235,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -2295,11 +2301,11 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -2350,12 +2356,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -2412,12 +2418,12 @@ class TestBulkInsertInvalidParams(TestcaseBaseBulkInsert): schema = cf.gen_collection_schema(fields=fields, auto_id=auto_id) self.collection_wrap.init_collection(c_name, schema=schema) # import data - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) log.info(f"bulk insert state:{success}") assert not success @@ -2488,12 +2494,12 @@ class TestBulkInsertAdvanced(TestcaseBaseBulkInsert): check_flag = False break - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, files=files ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=180 + task_ids=[task_id], timeout=180 ) tt = time.time() - t0 log.info( diff --git a/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py b/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py index 7eb09e853..026c552f6 100644 --- a/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py +++ b/tests/python_client/bulk_insert/test_bulk_insert_task_clean.py @@ -118,15 +118,14 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, - # is_row_based=is_row_based, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, _ = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -220,15 +219,15 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): self.collection_wrap.init_collection(c_name, schema=schema) # import data t0 = time.time() - task_ids, _ = self.utility_wrap.bulk_insert( + task_id, _ = self.utility_wrap.do_bulk_insert( collection_name=c_name, partition_name=None, is_row_based=is_row_based, files=files, ) - logging.info(f"bulk insert task ids:{task_ids}") + logging.info(f"bulk insert task ids:{task_id}") success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) tt = time.time() - t0 log.info(f"bulk insert state:{success} in {tt}") @@ -245,7 +244,7 @@ class TestBulkInsertTaskClean(TestcaseBaseBulkInsert): log.info(f" collection entities: {num_entities}") assert num_entities == 0 success, states = self.utility_wrap.wait_for_bulk_insert_tasks_completed( - task_ids=task_ids, timeout=90 + task_ids=[task_id], timeout=90 ) assert not success for state in states.values(): -- GitLab