未验证 提交 ee5da73f 编写于 作者: Z zhuwenxing 提交者: GitHub

[test]Add bulk insert for test and refactoring the checker function (#25997)

Signed-off-by: Nzhuwenxing <wenxing.zhu@zilliz.com>
上级 c0d24335
......@@ -95,8 +95,7 @@ class ApiUtilityWrapper:
unknown, "unknown")
def wait_for_bulk_insert_tasks_completed(self, task_ids, target_state=BulkInsertState.ImportCompleted,
timeout=None, using="default", **kwargs):
start = time.time()
timeout=None, using="default", **kwargs):
tasks_state_distribution = {
"success": set(),
"failed": set(),
......
......@@ -106,7 +106,8 @@ class TestChaos(TestChaosBase):
minio_endpoint = f"{minio_ip}:{minio_port}"
bucket_name = ms.index_nodes[0]["infos"]["system_configurations"]["minio_bucket_name"]
schema = cf.gen_bulk_insert_collection_schema(dim=dim, with_varchar_field=with_varchar_field)
data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len, with_varchar_field=with_varchar_field)
data = cf.gen_default_list_data_for_bulk_insert(nb=nb, varchar_len=varchar_len,
with_varchar_field=with_varchar_field)
data_dir = "/tmp/bulk_insert_data"
Path(data_dir).mkdir(parents=True, exist_ok=True)
files = []
......
......@@ -35,7 +35,7 @@ enable_traceback = False
DEFAULT_FMT = '[start time:{start_time}][time cost:{elapsed:0.8f}s][operation_name:{operation_name}][collection name:{collection_name}] -> {result!r}'
def trace(fmt=DEFAULT_FMT, prefix='chaos-test', flag=True):
def trace(fmt=DEFAULT_FMT, prefix='test', flag=True):
def decorate(func):
@functools.wraps(func)
def inner_wrapper(self, *args, **kwargs):
......@@ -102,20 +102,30 @@ class Checker:
self._keep_running = True
self.rsp_times = []
self.average_time = 0
self.files = []
self.c_wrap = ApiCollectionWrapper()
self.utility_wrap = ApiUtilityWrapper()
c_name = collection_name if collection_name is not None else cf.gen_unique_str(
'Checker_')
self.c_name = c_name
schema = cf.gen_default_collection_schema(dim=dim) if schema is None else schema
self.schema = schema
self.dim = cf.get_dim_by_schema(schema=schema)
self.int64_field_name = cf.get_int64_field_name(schema=schema)
self.float_vector_field_name = cf.get_float_vec_field_name(schema=schema)
self.c_wrap.init_collection(name=c_name,
schema=schema,
shards_num=shards_num,
timeout=timeout,
# active_trace=True,
enable_traceback=enable_traceback)
if insert_data:
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH, dim=dim),
timeout=timeout,
enable_traceback=enable_traceback)
log.info(f"collection {c_name} created, start to insert data")
t0 = time.perf_counter()
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=schema, start=0),
timeout=timeout,
enable_traceback=enable_traceback)
log.info(f"insert data for collection {c_name} cost {time.perf_counter() - t0}s")
self.initial_entities = self.c_wrap.num_entities # do as a flush
def total(self):
......@@ -166,15 +176,38 @@ class Checker:
f"end at {self.fail_records[-1][2]}")
return recovery_time
def prepare_bulk_insert_data(self,
nb=constants.ENTITIES_FOR_BULKINSERT,
file_type="npy",
minio_endpoint="127.0.0.1:9000",
bucket_name="milvus-bucket"):
schema = self.schema
log.info(f"prepare data for bulk insert")
files = cf.prepare_bulk_insert_data(schema=schema,
nb=nb,
file_type=file_type,
minio_endpoint=minio_endpoint,
bucket_name=bucket_name)
self.files = files
return files
def do_bulk_insert(self):
log.info(f"bulk insert collection name: {self.c_name}")
task_ids, result = self.utility_wrap.do_bulk_insert(collection_name=self.c_name,
files=self.files)
log.info(f"task ids {task_ids}")
completed, result = self.utility_wrap.wait_for_bulk_insert_tasks_completed(task_ids=[task_ids], timeout=720)
return task_ids, completed
class SearchChecker(Checker):
"""check search operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None,):
if collection_name is None:
collection_name = cf.gen_unique_str("SearchChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.create_index(ct.default_float_vec_field_name,
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
......@@ -186,8 +219,8 @@ class SearchChecker(Checker):
@trace()
def search(self):
res, result = self.c_wrap.search(
data=cf.gen_vectors(5, ct.default_dim),
anns_field=ct.default_float_vec_field_name,
data=cf.gen_vectors(5, self.dim),
anns_field=self.float_vector_field_name,
param=constants.DEFAULT_SEARCH_PARAM,
limit=1,
timeout=timeout,
......@@ -209,8 +242,8 @@ class SearchChecker(Checker):
class InsertFlushChecker(Checker):
"""check Insert and flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
super().__init__(collection_name=collection_name, shards_num=shards_num)
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
......@@ -218,7 +251,7 @@ class InsertFlushChecker(Checker):
while True:
t0 = time.time()
_, insert_result = \
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
......@@ -250,10 +283,10 @@ class InsertFlushChecker(Checker):
class FlushChecker(Checker):
"""check flush operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2):
def __init__(self, collection_name=None, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("FlushChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self.initial_entities = self.c_wrap.num_entities
@trace()
......@@ -268,7 +301,7 @@ class FlushChecker(Checker):
@exception_handler()
def run_task(self):
_, result = self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.DELTA_PER_INS),
_, result = self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout,
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
......@@ -284,20 +317,20 @@ class FlushChecker(Checker):
class InsertChecker(Checker):
"""check flush operations in a dependent thread"""
def __init__(self, collection_name=None, flush=False, shards_num=2):
def __init__(self, collection_name=None, flush=False, shards_num=2, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("InsertChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
self.inserted_data = []
self.scale = 1*10**6
self.start_time_stamp = int(time.time()*self.scale) # us
self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp}'
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp}'
@trace()
def insert(self):
data = cf.gen_default_list_data(nb=constants.DELTA_PER_INS)
data = cf.get_column_data_by_schema(nb=constants.DELTA_PER_INS, schema=self.schema)
ts_data = []
for i in range(constants.DELTA_PER_INS):
time.sleep(0.001)
......@@ -326,7 +359,7 @@ class InsertChecker(Checker):
def verify_data_completeness(self):
try:
self.c_wrap.create_index(ct.default_float_vec_field_name,
self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str('index_'),
timeout=timeout,
......@@ -336,14 +369,14 @@ class InsertChecker(Checker):
log.error(f"create index error: {e}")
self.c_wrap.load()
end_time_stamp = int(time.time()*self.scale)
self.term_expr = f'{ct.default_int64_field_name} >= {self.start_time_stamp} and ' \
f'{ct.default_int64_field_name} <= {end_time_stamp}'
self.term_expr = f'{self.int64_field_name} >= {self.start_time_stamp} and ' \
f'{self.int64_field_name} <= {end_time_stamp}'
data_in_client = []
for d in self.inserted_data:
if self.start_time_stamp <= d <= end_time_stamp:
data_in_client.append(d)
res, result = self.c_wrap.query(self.term_expr, timeout=timeout,
output_fields=[f'{ct.default_int64_field_name}'],
output_fields=[f'{self.int64_field_name}'],
limit=len(data_in_client) * 2,
check_task=CheckTasks.check_nothing)
......@@ -357,10 +390,10 @@ class InsertChecker(Checker):
class CreateChecker(Checker):
"""check create operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("CreateChecker_")
super().__init__(collection_name=collection_name)
super().__init__(collection_name=collection_name, schema=schema)
@trace()
def init_collection(self):
......@@ -388,20 +421,20 @@ class CreateChecker(Checker):
class IndexChecker(Checker):
"""check Insert operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("IndexChecker_")
super().__init__(collection_name=collection_name)
super().__init__(collection_name=collection_name, schema=schema)
self.index_name = cf.gen_unique_str('index_')
for i in range(5):
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
self.c_wrap.insert(data=cf.get_column_data_by_schema(nb=constants.ENTITIES_FOR_SEARCH, schema=self.schema),
timeout=timeout, enable_traceback=enable_traceback)
# do as a flush before indexing
log.debug(f"Index ready entities: {self.c_wrap.num_entities}")
@trace()
def create_index(self):
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=self.index_name,
timeout=timeout,
......@@ -425,11 +458,11 @@ class IndexChecker(Checker):
class QueryChecker(Checker):
"""check query operations in a dependent thread"""
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
def __init__(self, collection_name=None, shards_num=2, replica_number=1, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("QueryChecker_")
super().__init__(collection_name=collection_name, shards_num=shards_num)
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
super().__init__(collection_name=collection_name, shards_num=shards_num, schema=schema)
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
......@@ -450,8 +483,8 @@ class QueryChecker(Checker):
int_values = []
for _ in range(5):
int_values.append(randint(0, constants.ENTITIES_FOR_SEARCH))
self.term_expr = f'{ct.default_int64_field_name} in {int_values}'
res, result= self.query()
self.term_expr = f'{self.int64_field_name} in {int_values}'
res, result = self.query()
return res, result
def keep_running(self):
......@@ -463,12 +496,12 @@ class QueryChecker(Checker):
class LoadChecker(Checker):
"""check load operations in a dependent thread"""
def __init__(self, collection_name=None, replica_number=1):
def __init__(self, collection_name=None, replica_number=1, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name)
collection_name = cf.gen_unique_str("LoadChecker_")
super().__init__(collection_name=collection_name, schema=schema)
self.replica_number = replica_number
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
......@@ -497,11 +530,11 @@ class LoadChecker(Checker):
class DeleteChecker(Checker):
"""check delete operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DeleteChecker_")
super().__init__(collection_name=collection_name)
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
super().__init__(collection_name=collection_name, schema=schema)
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
......@@ -509,10 +542,10 @@ class DeleteChecker(Checker):
enable_traceback=enable_traceback,
check_task=CheckTasks.check_nothing)
self.c_wrap.load() # load before query
term_expr = f'{ct.default_int64_field_name} > 0'
term_expr = f'{self.int64_field_name} > 0'
res, _ = self.c_wrap.query(term_expr, output_fields=[
ct.default_int64_field_name])
self.ids = [r[ct.default_int64_field_name] for r in res]
self.int64_field_name])
self.ids = [r[self.int64_field_name] for r in res]
self.expr = None
@trace()
......@@ -523,7 +556,7 @@ class DeleteChecker(Checker):
@exception_handler()
def run_task(self):
delete_ids = self.ids.pop()
self.expr = f'{ct.default_int64_field_name} in {[delete_ids]}'
self.expr = f'{self.int64_field_name} in {[delete_ids]}'
res, result = self.delete()
return res, result
......@@ -536,12 +569,12 @@ class DeleteChecker(Checker):
class CompactChecker(Checker):
"""check compact operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("CompactChecker_")
super().__init__(collection_name=collection_name)
super().__init__(collection_name=collection_name, schema=schema)
self.ut = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
......@@ -571,10 +604,10 @@ class CompactChecker(Checker):
class DropChecker(Checker):
"""check drop operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("DropChecker_")
super().__init__(collection_name=collection_name)
super().__init__(collection_name=collection_name, schema=schema)
@trace()
def drop(self):
......@@ -600,12 +633,12 @@ class DropChecker(Checker):
class LoadBalanceChecker(Checker):
"""check loadbalance operations in a dependent thread"""
def __init__(self, collection_name=None):
def __init__(self, collection_name=None, schema=None):
if collection_name is None:
collection_name = cf.gen_unique_str("LoadBalanceChecker_")
super().__init__(collection_name=collection_name)
super().__init__(collection_name=collection_name, schema=schema)
self.utility_wrap = ApiUtilityWrapper()
res, result = self.c_wrap.create_index(ct.default_float_vec_field_name,
res, result = self.c_wrap.create_index(self.float_vector_field_name,
constants.DEFAULT_INDEX_PARAM,
index_name=cf.gen_unique_str(
'index_'),
......
......@@ -55,4 +55,4 @@ def request_duration(request):
@pytest.fixture
def is_check(request):
return request.config.getoption("--is_check")
\ No newline at end of file
return request.config.getoption("--is_check")
......@@ -8,14 +8,14 @@ DEFAULT_DEPLOY_MODE = "single"
CHAOS_NAMESPACE = "chaos-testing" # namespace of chaos
CHAOS_API_VERSION = 'chaos-mesh.org/v1alpha1' # chaos mesh api version
CHAOS_GROUP = 'chaos-mesh.org' # chao mesh group
CHAOS_VERSION = 'v1alpha1' # chao mesh version
CHAOS_GROUP = 'chaos-mesh.org' # chaos mesh group
CHAOS_VERSION = 'v1alpha1' # chaos mesh version
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 10 # entities per insert
ENTITIES_FOR_SEARCH = 3000 # entities for search_collection
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path
ENTITIES_FOR_BULKINSERT = 1000000 # entities for bulk insert
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chaos path
TESTS_CONFIG_LOCATION = 'chaos_objects/pod_kill/'
ALL_CHAOS_YAMLS = 'chaos_allstandalone_pod_kill.yaml'
RELEASE_NAME = 'test-allstandalone-pod-kill-19-25-26'
......
......@@ -34,6 +34,12 @@ class TestAllCollection(TestcaseBase):
collection_w = self.init_collection_wrap(name=name, schema=schema)
tt = time.time() - t0
assert collection_w.name == name
# get collection info
schema = collection_w.schema
dim = cf.get_dim_by_schema(schema=schema)
int64_field_name = cf.get_int64_field_name(schema=schema)
float_vector_field_name = cf.get_float_vec_field_name(schema=schema)
# compact collection before getting num_entities
collection_w.flush(timeout=180)
collection_w.compact()
......@@ -48,7 +54,7 @@ class TestAllCollection(TestcaseBase):
for field in collection_w.schema.fields:
if field.dtype.name == "JSON":
with_json = True
data = cf.gen_default_list_data(start=offset, with_json=with_json)
data = cf.get_column_data_by_schema(nb=ct.default_nb, schema=schema, start=offset)
t0 = time.time()
_, res = collection_w.insert(data)
tt = time.time() - t0
......@@ -68,9 +74,9 @@ class TestAllCollection(TestcaseBase):
index_infos = [index.to_dict() for index in collection_w.indexes]
index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}
if len(index_infos) == 0:
log.info("collection {name} does not have index, create index for it")
log.info(f"collection {name} does not have index, create index for it")
t0 = time.time()
index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
index, _ = collection_w.create_index(field_name=float_vector_field_name,
index_params=index_params,
index_name=cf.gen_unique_str())
tt = time.time() - t0
......@@ -84,17 +90,17 @@ class TestAllCollection(TestcaseBase):
collection_w.load()
# search
search_vectors = cf.gen_vectors(1, ct.default_dim)
search_vectors = cf.gen_vectors(1, dim)
search_params = {"metric_type": "L2", "params": {"ef": 64}}
t0 = time.time()
res_1, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
anns_field=float_vector_field_name,
param=search_params, limit=1)
tt = time.time() - t0
log.info(f"assert search: {tt}")
assert len(res_1) == 1
# query
term_expr = f'{ct.default_int64_field_name} in {[i for i in range(offset, 0)]}'
term_expr = f'{int64_field_name} in {[i for i in range(offset, 0)]}'
t0 = time.time()
res, _ = collection_w.query(term_expr)
tt = time.time() - t0
......@@ -103,7 +109,7 @@ class TestAllCollection(TestcaseBase):
collection_w.release()
# insert data
d = cf.gen_default_list_data(with_json=with_json)
d = cf.get_column_data_by_schema(nb=ct.default_nb, schema=schema)
collection_w.insert(d)
# load
......@@ -115,10 +121,10 @@ class TestAllCollection(TestcaseBase):
# search
nq = 5
topk = 5
search_vectors = cf.gen_vectors(nq, ct.default_dim)
search_vectors = cf.gen_vectors(nq, dim)
t0 = time.time()
res, _ = collection_w.search(data=search_vectors,
anns_field=ct.default_float_vec_field_name,
anns_field=float_vector_field_name,
param=search_params, limit=topk)
tt = time.time() - t0
log.info(f"assert search: {tt}")
......@@ -126,7 +132,7 @@ class TestAllCollection(TestcaseBase):
assert len(res[0]) <= topk
# query
term_expr = f'{ct.default_int64_field_name} in [1, 2, 3, 4]'
term_expr = f'{int64_field_name} in [1, 2, 3, 4]'
t0 = time.time()
res, _ = collection_w.query(term_expr)
tt = time.time() - t0
......
......@@ -13,6 +13,7 @@ from chaos.checker import (CreateChecker,
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common import common_func as cf
from chaos.chaos_commons import assert_statistic
from chaos import constants
from delayed_assert import assert_expectations
......@@ -34,7 +35,7 @@ class TestBase:
class TestOperations(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, user, password):
def connection(self, host, port, user, password, minio_host):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
......@@ -47,18 +48,21 @@ class TestOperations(TestBase):
self.port = port
self.user = user
self.password = password
self.minio_endpoint = f"{minio_host}:9000"
def init_health_checkers(self, collection_name=None):
c_name = collection_name
schema = cf.gen_default_collection_schema(auto_id=False)
checkers = {
Op.create: CreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
Op.drop: DropChecker(collection_name=c_name)
Op.create: CreateChecker(collection_name=None, schema=schema),
Op.insert: InsertChecker(collection_name=c_name, schema=schema),
Op.flush: FlushChecker(collection_name=c_name, schema=schema),
Op.index: IndexChecker(collection_name=None, schema=schema),
Op.search: SearchChecker(collection_name=c_name, schema=schema),
Op.query: QueryChecker(collection_name=c_name, schema=schema),
Op.delete: DeleteChecker(collection_name=c_name, schema=schema),
Op.drop: DropChecker(collection_name=None, schema=schema)
}
self.health_checkers = checkers
......@@ -69,7 +73,14 @@ class TestOperations(TestBase):
log.info(connections.get_connection_addr('default'))
c_name = None
self.init_health_checkers(collection_name=c_name)
# prepare data by bulk insert
log.info("*********************Prepare Data by bulk insert**********************")
cc.start_monitor_threads(self.health_checkers)
for k, v in self.health_checkers.items():
log.info(f"prepare bulk insert data for {k}")
v.prepare_bulk_insert_data(minio_endpoint=self.minio_endpoint)
v.do_bulk_insert()
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
......@@ -93,7 +104,7 @@ class TestOperations(TestBase):
for k, v in self.health_checkers.items():
log.info(f"{k} rto: {v.get_rto()}")
rto = v.get_rto()
pytest.assume(rto < 30, f"{k} rto expect 30s but get {rto}s") # rto should be less than 30s
pytest.assume(rto < 30, f"{k} rto expect 30s but get {rto}s") # rto should be less than 30s
if Op.insert in self.health_checkers:
# verify the no insert data loss
......
......@@ -82,7 +82,7 @@ class TestOperations(TestBase):
self.health_checkers = checkers
@pytest.mark.tags(CaseLabel.L3)
def test_operations(self,collection_name, request_duration):
def test_operations(self, collection_name, request_duration):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
......@@ -91,13 +91,13 @@ class TestOperations(TestBase):
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Request Load Start**********************")
# wait request_duration for the load request to be finished
request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","")
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration//10)
for k,v in self.health_checkers.items():
for k, v in self.health_checkers.items():
v.check_result()
log.info("******assert after chaos deleted: ")
assert_statistic(self.health_checkers)
......
......@@ -3,17 +3,22 @@ import random
import math
import string
import json
import time
import uuid
from functools import singledispatch
import numpy as np
import pandas as pd
from sklearn import preprocessing
from npy_append_array import NpyAppendArray
from faker import Faker
from pathlib import Path
from minio import Minio
from pymilvus import DataType
from base.schema_wrapper import ApiCollectionSchemaWrapper, ApiFieldSchemaWrapper
from common import common_type as ct
from utils.util_log import test_log as log
from customize.milvus_operator import MilvusOperator
fake = Faker()
"""" Methods of processing data """
......@@ -492,14 +497,65 @@ def gen_default_list_data_for_bulk_insert(nb=ct.default_nb, varchar_len=2000, wi
int_values = [i for i in range(nb)]
float_values = [np.float32(i) for i in range(nb)]
string_values = [f"{str(i)}_{str_value}" for i in range(nb)]
# in case of large nb, float_vec_values will be too large in memory
# then generate float_vec_values in each loop instead of generating all at once during generate npy or json file
float_vec_values = [] # placeholder for float_vec
data = [int_values, float_values, string_values, float_vec_values]
if with_varchar_field is False:
data = [int_values, float_values, float_vec_values]
return data
def get_list_data_by_schema(nb=ct.default_nb, schema=None):
def prepare_bulk_insert_data(schema=None,
nb=ct.default_nb,
file_type="npy",
minio_endpoint="127.0.0.1:9000",
bucket_name="milvus-bucket"):
schema = gen_default_collection_schema() if schema is None else schema
dim = get_dim_by_schema(schema=schema)
log.info(f"start to generate raw data for bulk insert")
t0 = time.time()
data = get_column_data_by_schema(schema=schema, nb=nb, skip_vectors=True)
log.info(f"generate raw data for bulk insert cost {time.time() - t0} s")
data_dir = "/tmp/bulk_insert_data"
Path(data_dir).mkdir(parents=True, exist_ok=True)
log.info(f"schema:{schema}, nb:{nb}, file_type:{file_type}, minio_endpoint:{minio_endpoint}, bucket_name:{bucket_name}")
files = []
log.info(f"generate {file_type} files for bulk insert")
if file_type == "json":
files = gen_json_files_for_bulk_insert(data, schema, data_dir)
if file_type == "npy":
files = gen_npy_files_for_bulk_insert(data, schema, data_dir)
log.info(f"generated {len(files)} {file_type} files for bulk insert, cost {time.time() - t0} s")
log.info("upload file to minio")
client = Minio(minio_endpoint, access_key="minioadmin", secret_key="minioadmin", secure=False)
for file_name in files:
file_size = os.path.getsize(os.path.join(data_dir, file_name)) / 1024 / 1024
t0 = time.time()
client.fput_object(bucket_name, file_name, os.path.join(data_dir, file_name))
log.info(f"upload file {file_name} to minio, size: {file_size:.2f} MB, cost {time.time() - t0:.2f} s")
return files
def get_column_data_by_schema(nb=ct.default_nb, schema=None, skip_vectors=False, start=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
fields_not_auto_id = []
for field in fields:
if not field.auto_id:
fields_not_auto_id.append(field)
data = []
for field in fields_not_auto_id:
if field.dtype == DataType.FLOAT_VECTOR and skip_vectors is True:
tmp = []
else:
tmp = gen_data_by_type(field, nb=nb, start=start)
data.append(tmp)
return data
def get_row_data_by_schema(nb=ct.default_nb, schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
......@@ -515,48 +571,149 @@ def get_list_data_by_schema(nb=ct.default_nb, schema=None):
data.append(tmp)
return data
def gen_data_by_type(field):
def get_fields_map(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
fields_map = {}
for field in fields:
fields_map[field.name] = field.dtype
return fields_map
def get_int64_field_name(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.INT64:
return field.name
return None
def get_float_field_name(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.FLOAT or field.dtype == DataType.DOUBLE:
return field.name
return None
def get_float_vec_field_name(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.FLOAT_VECTOR:
return field.name
return None
def get_binary_vec_field_name(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.BINARY_VECTOR:
return field.name
return None
def get_dim_by_schema(schema=None):
if schema is None:
schema = gen_default_collection_schema()
fields = schema.fields
for field in fields:
if field.dtype == DataType.FLOAT_VECTOR or field.dtype == DataType.BINARY_VECTOR:
dim = field.params['dim']
return dim
return None
def gen_data_by_type(field, nb=None, start=None):
# if nb is None, return one data, else return a list of data
data_type = field.dtype
if data_type == DataType.BOOL:
return random.choice([True, False])
if nb is None:
return random.choice([True, False])
return [random.choice([True, False]) for _ in range(nb)]
if data_type == DataType.INT8:
return random.randint(-128, 127)
if nb is None:
return random.randint(-128, 127)
return [random.randint(-128, 127) for _ in range(nb)]
if data_type == DataType.INT16:
return random.randint(-32768, 32767)
if nb is None:
return random.randint(-32768, 32767)
return [random.randint(-32768, 32767) for _ in range(nb)]
if data_type == DataType.INT32:
return random.randint(-2147483648, 2147483647)
if nb is None:
return random.randint(-2147483648, 2147483647)
return [random.randint(-2147483648, 2147483647) for _ in range(nb)]
if data_type == DataType.INT64:
return random.randint(-9223372036854775808, 9223372036854775807)
if nb is None:
return random.randint(-9223372036854775808, 9223372036854775807)
if start is not None:
return [i for i in range(start, start+nb)]
return [random.randint(-9223372036854775808, 9223372036854775807) for _ in range(nb)]
if data_type == DataType.FLOAT:
return np.float64(random.random()) # Object of type float32 is not JSON serializable, so set it as float64
if nb is None:
return np.float32(random.random())
return [np.float32(random.random()) for _ in range(nb)]
if data_type == DataType.DOUBLE:
return np.float64(random.random())
if nb is None:
return np.float64(random.random())
return [np.float64(random.random()) for _ in range(nb)]
if data_type == DataType.VARCHAR:
max_length = field.params['max_length']
max_length = min(20, max_length-1)
length = random.randint(0, max_length)
return "".join([chr(random.randint(97, 122)) for _ in range(length)])
if nb is None:
return "".join([chr(random.randint(97, 122)) for _ in range(length)])
return ["".join([chr(random.randint(97, 122)) for _ in range(length)]) for _ in range(nb)]
if data_type == DataType.JSON:
if nb is None:
return {"name": fake.name(), "address": fake.address()}
data = [{"name": str(i), "address": i} for i in range(nb)]
return data
if data_type == DataType.FLOAT_VECTOR:
dim = field.params['dim']
return preprocessing.normalize([np.array([random.random() for i in range(dim)])])[0].tolist()
if nb is None:
return [random.random() for i in range(dim)]
return [[random.random() for i in range(dim)] for _ in range(nb)]
return None
def gen_json_files_for_bulk_insert(data, schema, data_dir, **kwargs):
nb = kwargs.get("nb", ct.default_nb)
dim = kwargs.get("dim", ct.default_dim)
def gen_json_files_for_bulk_insert(data, schema, data_dir):
for d in data:
if len(d) > 0:
nb = len(d)
dim = get_dim_by_schema(schema)
vec_field_name = get_float_vec_field_name(schema)
fields_name = [field.name for field in schema.fields]
file_name = f"bulk_insert_data_source_dim_{dim}_nb_{nb}.json"
# get vec field index
vec_field_index = fields_name.index(vec_field_name)
uuid_str = str(uuid.uuid4())
log.info(f"file dir name: {uuid_str}")
file_name = f"{uuid_str}/bulk_insert_data_source_dim_{dim}_nb_{nb}.json"
files = [file_name]
data_source = os.path.join(data_dir, file_name)
Path(data_source).parent.mkdir(parents=True, exist_ok=True)
log.info(f"file name: {data_source}")
with open(data_source, "w") as f:
f.write("{")
f.write("\n")
f.write('"rows":[')
f.write("\n")
for i in range(nb):
entity_value = [field_values[i] for field_values in data[:-1]]
vector = [random.random() for _ in range(dim)]
entity_value.append(vector)
entity_value = [None for _ in range(len(fields_name))]
for j in range(len(data)):
if j == vec_field_index:
entity_value[j] = [random.random() for _ in range(dim)]
else:
entity_value[j] = data[j][i]
entity = dict(zip(fields_name, entity_value))
f.write(json.dumps(entity, indent=4, default=to_serializable))
if i != nb - 1:
......@@ -568,22 +725,35 @@ def gen_json_files_for_bulk_insert(data, schema, data_dir, **kwargs):
return files
def gen_npy_files_for_bulk_insert(data, schema, data_dir, **kwargs):
nb = kwargs.get("nb", ct.default_nb)
dim = kwargs.get("dim", ct.default_dim)
def gen_npy_files_for_bulk_insert(data, schema, data_dir):
for d in data:
if len(d) > 0:
nb = len(d)
dim = get_dim_by_schema(schema)
vec_filed_name = get_float_vec_field_name(schema)
fields_name = [field.name for field in schema.fields]
files = []
uuid_str = uuid.uuid4()
for field in fields_name:
files.append(f"{field}.npy")
files.append(f"{uuid_str}/{field}.npy")
for i, file in enumerate(files):
data_source = os.path.join(data_dir, file)
if "vector" in file:
# mkdir for npy file
Path(data_source).parent.mkdir(parents=True, exist_ok=True)
log.info(f"save file {data_source}")
if vec_filed_name in file:
log.info(f"generate {nb} vectors with dim {dim} for {data_source}")
with NpyAppendArray(data_source, "wb") as npaa:
for j in range(nb):
vector = np.array([[random.random() for _ in range(dim)]])
npaa.append(vector)
elif isinstance(data[i][0], dict):
tmp = []
for d in data[i]:
tmp.append(json.dumps(d))
data[i] = tmp
np.save(data_source, np.array(data[i]))
else:
np.save(data_source, np.array(data[i]))
return files
......
......@@ -33,6 +33,7 @@ timeout-decorator==0.5.0
# for bulk insert test
minio==7.1.5
npy-append-array==0.9.15
Faker==19.2.0
# for benchmark
h5py==3.8.0
......
......@@ -742,4 +742,3 @@ class TestBulkInsert(TestcaseBaseBulkInsert):
check_task=CheckTasks.check_search_results,
check_items={"nq": 1, "limit": 1},
)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册