未验证 提交 c052892e 编写于 作者: Z zhuwenxing 提交者: GitHub

[skip e2e]Update multi replicas chaos test (#16981)

Signed-off-by: Nzhuwenxing <wenxing.zhu@zilliz.com>
上级 6090a4e7
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-querynode-pod-kill
namespace: chaos-testing
spec:
selector:
pods:
chaos-testing:
- milvus-multi-querynode-querynode-bcdc595d9-7vmcj
- milvus-multi-querynode-querynode-bcdc595d9-ccxls
- milvus-multi-querynode-querynode-bcdc595d9-dpwgp
mode: all
action: pod-kill
duration: 2m
gracePeriod: 0
\ No newline at end of file
......@@ -4,6 +4,7 @@ import time
from time import sleep
from delayed_assert import expect
from base.collection_wrapper import ApiCollectionWrapper
from base.utility_wrapper import ApiUtilityWrapper
from common import common_func as cf
from common import common_type as ct
from chaos import constants
......@@ -34,14 +35,16 @@ class Checker:
b. count operations and success rate
"""
def __init__(self):
def __init__(self, collection_name=None, shards_num=2):
self._succ = 0
self._fail = 0
self.rsp_times = []
self.average_time = 0
self.c_wrap = ApiCollectionWrapper()
self.c_wrap.init_collection(name=cf.gen_unique_str('Checker_'),
c_name = collection_name if collection_name is not None else cf.gen_unique_str('Checker_')
self.c_wrap.init_collection(name=c_name,
schema=cf.gen_default_collection_schema(),
shards_num=shards_num,
timeout=timeout,
enable_traceback=enable_traceback)
self.c_wrap.insert(data=cf.gen_default_list_data(nb=constants.ENTITIES_FOR_SEARCH),
......@@ -75,9 +78,9 @@ class Checker:
class SearchChecker(Checker):
"""check search operations in a dependent thread"""
def __init__(self):
super().__init__()
self.c_wrap.load(enable_traceback=enable_traceback) # do load before search
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before search
def keep_running(self):
while True:
......@@ -105,8 +108,8 @@ class SearchChecker(Checker):
class InsertFlushChecker(Checker):
"""check Insert and flush operations in a dependent thread"""
def __init__(self, flush=False):
super().__init__()
def __init__(self, collection_name=None, flush=False, shards_num=2):
super().__init__(collection_name=collection_name, shards_num=shards_num)
self._flush = flush
self.initial_entities = self.c_wrap.num_entities
......@@ -203,9 +206,9 @@ class IndexChecker(Checker):
class QueryChecker(Checker):
"""check query operations in a dependent thread"""
def __init__(self):
super().__init__()
self.c_wrap.load(enable_traceback=enable_traceback) # load before query
def __init__(self, collection_name=None, shards_num=2, replica_number=1):
super().__init__(collection_name=collection_name, shards_num=shards_num)
self.c_wrap.load(replica_number=replica_number) # do load before query
def keep_running(self):
while True:
......@@ -228,6 +231,49 @@ class QueryChecker(Checker):
sleep(constants.WAIT_PER_OP / 10)
class BulkLoadChecker(Checker):
"""check bulk load operations in a dependent thread"""
def __init__(self,):
super().__init__()
self.utility_wrap = ApiUtilityWrapper()
self.schema = cf.gen_default_collection_schema()
self.files = ["/tmp/test_data.json"]
self.failed_tasks = []
def update(self, files=None, schema=None):
if files:
self.files = files
if schema:
self.schema = schema
def keep_running(self):
while True:
c_name = cf.gen_unique_str("BulkLoadChecker_")
self.c_wrap.init_collection(name=c_name, schema=self.schema)
# import data
t0 = time.time()
task_ids, res_1 = self.utility_wrap.bulk_load(collection_name=c_name,
partition_name='',
row_based=True,
files=self.files)
log.info(f"bulk load task ids:{task_ids}")
completed, res_2 = self.utility_wrap.wait_for_bulk_load_tasks_completed(task_ids=task_ids,
timeout=30)
t1 = time.time() - t0
if completed and res_1 and res_2:
self.rsp_times.append(t1 - t0)
self.average_time = ((t1 - t0) + self.average_time * self._succ) / (self._succ + 1)
self._succ += 1
log.debug(f"bulk load success, time: {t1 - t0:.4f}, average_time: {self.average_time:4f}")
else:
self._fail += 1
self.failed_tasks.append(c_name)
sleep(constants.WAIT_PER_OP / 10)
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
......@@ -242,4 +288,4 @@ def assert_statistic(checkers, expectations={}):
else:
log.info(f"Expect Succ: {str(k)} {checker_result}")
expect(succ_rate > 0.90 or total > 2,
f"Expect Succ: {str(k)} {checker_result}")
\ No newline at end of file
f"Expect Succ: {str(k)} {checker_result}")
......@@ -8,8 +8,7 @@ import json
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker, InsertFlushChecker,
SearchChecker, QueryChecker, IndexChecker, Op)
from chaos.checker import (InsertFlushChecker, SearchChecker, QueryChecker, Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
......@@ -17,6 +16,7 @@ from utils.util_k8s import wait_pods_ready, get_pod_list, get_pod_ip_name_pairs
from utils.util_common import findkeys
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common import common_func as cf
from chaos import constants
from delayed_assert import expect, assert_expectations
......@@ -70,6 +70,17 @@ def record_results(checkers):
return res
def get_querynode_info(release_name):
querynode_id_pod_pair = {}
querynode_ip_pod_pair = get_pod_ip_name_pairs(
"chaos-testing", f"app.kubernetes.io/instance={release_name}, component=querynode")
ms = MilvusSys()
for node in ms.query_nodes:
ip = node["infos"]['hardware_infos']["ip"].split(":")[0]
querynode_id_pod_pair[node["identifier"]] = querynode_ip_pod_pair[ip]
return querynode_id_pod_pair
class TestChaosBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
......@@ -82,37 +93,6 @@ class TestChaosBase:
_chaos_config = None
health_checkers = {}
def parser_testcase_config(self, chaos_yaml, chaos_config):
cluster_nodes = check_cluster_nodes(chaos_config)
tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml'
tests_config = cc.gen_experiment_config(tests_yaml)
test_collections = tests_config.get('Collections', None)
for t in test_collections:
test_chaos = t.get('testcase', {}).get('chaos', {})
if test_chaos in chaos_yaml:
expects = t.get('testcase', {}).get(
'expectation', {}).get('cluster_1_node', {})
# for the cluster_n_node
if cluster_nodes > 1:
expects = t.get('testcase', {}).get(
'expectation', {}).get('cluster_n_node', {})
log.info(f"yaml.expects: {expects}")
self.expect_create = expects.get(
Op.create.value, constants.SUCC)
self.expect_insert = expects.get(
Op.insert.value, constants.SUCC)
self.expect_flush = expects.get(Op.flush.value, constants.SUCC)
self.expect_index = expects.get(Op.index.value, constants.SUCC)
self.expect_search = expects.get(
Op.search.value, constants.SUCC)
self.expect_query = expects.get(Op.query.value, constants.SUCC)
log.info(f"self.expects: create:{self.expect_create}, insert:{self.expect_insert}, "
f"flush:{self.expect_flush}, index:{self.expect_index}, "
f"search:{self.expect_search}, query:{self.expect_query}")
return True
return False
class TestChaos(TestChaosBase):
......@@ -128,9 +108,13 @@ class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self):
c_name = cf.gen_unique_str('Checker_')
replicas_num = 2
shards_num = 2
checkers = {
Op.search: SearchChecker(replica_number=2),
Op.query: QueryChecker(replica_number=2)
Op.insert: InsertFlushChecker(collection_name=c_name, shards_num=shards_num),
Op.search: SearchChecker(collection_name=c_name, shards_num=shards_num, replica_number=replicas_num),
Op.query: QueryChecker(collection_name=c_name, shards_num=shards_num, replica_number=replicas_num)
}
self.health_checkers = checkers
......@@ -145,37 +129,49 @@ class TestChaos(TestChaosBase):
log.info(f'Alive threads: {threading.enumerate()}')
@pytest.mark.tags(CaseLabel.L3)
# @pytest.mark.parametrize('chaos_yaml', "chaos/chaos_objects/template/pod-failure-by-pod-list.yaml")
def test_chaos(self):
@pytest.mark.parametrize("is_streaming", [False]) # [False, True]
@pytest.mark.parametrize("failed_group_scope", ["one"]) # ["one", "all"]
@pytest.mark.parametrize("failed_node_type", ["shard_leader"]) # ["non_shard_leader", "shard_leader"]
@pytest.mark.parametrize("chaos_type", ["pod-failure"]) # ["pod-failure", "pod-kill"]
def test_multi_replicas_with_only_one_group_available(self, chaos_type, failed_node_type, failed_group_scope, is_streaming):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
# log.info(f"chaos_yaml: {chaos_yaml}")
log.info(connections.get_connection_addr('default'))
if is_streaming is False:
del self.health_checkers[Op.insert]
cc.start_monitor_threads(self.health_checkers)
# get replicas info
release_name = "milvus-multi-querynode"
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
querynode_ip_pod_pair = get_pod_ip_name_pairs(
"chaos-testing", "app.kubernetes.io/instance=milvus-multi-querynode, component=querynode")
querynode_id_pod_pair = {}
ms = MilvusSys()
for node in ms.query_nodes:
ip = node["infos"]['hardware_infos']["ip"].split(":")[0]
querynode_id_pod_pair[node["identifier"]
] = querynode_ip_pod_pair[ip]
querynode_id_pod_pair = get_querynode_info(release_name)
log.info(querynode_id_pod_pair)
group_list = []
shard_leader_list = []
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
for g in replicas_info.groups:
group_list.append(list(g.group_nodes))
# keep only one group in healthy status, other groups all are unhealthy by injecting chaos,
for shard in g.shards:
shard_leader_list.append(shard.shard_leader)
# keep only one group in healthy status, other groups will be unhealthy by injecting pod failure chaos,
# In the effected groups, each group has one pod is in pod failure status
target_pod_list = []
for g in group_list[1:]:
pod = querynode_id_pod_pair[g[0]]
target_pod_list.append(pod)
chaos_config = cc.gen_experiment_config("chaos/chaos_objects/template/pod-failure-by-pod-list.yaml")
target_group = []
group_list = sorted(group_list, key=lambda x: -len(x))
if failed_group_scope == "one":
target_group = group_list[:1]
if failed_group_scope == "all":
target_group = group_list[:]
for g in target_group:
target_nodes = []
if failed_node_type == "shard_leader":
target_nodes = list(set(g) & set(shard_leader_list))
if failed_node_type == "non_shard_leader":
target_nodes = list(set(g) - set(shard_leader_list))
for target_node in target_nodes:
pod = querynode_id_pod_pair[target_node]
target_pod_list.append(pod)
chaos_config = cc.gen_experiment_config(f"chaos/chaos_objects/template/{chaos_type}-by-pod-list.yaml")
chaos_config['metadata']['name'] = f"test-multi-replicase-{int(time.time())}"
kind = chaos_config['kind']
meta_name = chaos_config.get('metadata', None).get('name', None)
chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list
self._chaos_config = chaos_config # cache the chaos config for tear down
......@@ -185,10 +181,7 @@ class TestChaos(TestChaosBase):
sleep(constants.WAIT_PER_OP * 2)
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
log.info(f"replicas_info for search collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
# assert statistic:all ops 100% succ
log.info("******1st assert before chaos: ")
......@@ -207,19 +200,27 @@ class TestChaos(TestChaosBase):
# wait 120s
sleep(constants.CHAOS_DURATION)
log.info(f'Alive threads: {threading.enumerate()}')
# node info
querynode_id_pod_pair = get_querynode_info(release_name)
log.info(querynode_id_pod_pair)
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
log.info(f"replicas_info for search collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
log.info(f"replicas_info for query collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
# assert statistic
log.info("******2nd assert after chaos injected: ")
assert_statistic(self.health_checkers,
expectations={
Op.search: constants.SUCC,
Op.query: constants.SUCC
})
expectations = {
Op.search: constants.SUCC,
Op.query: constants.SUCC
}
if failed_group_scope == "all":
expectations = {
Op.search: constants.FAIL,
Op.query: constants.FAIL
}
assert_statistic(self.health_checkers, expectations=expectations)
# delete chaos
chaos_res.delete(meta_name)
log.info("chaos deleted")
......@@ -232,15 +233,18 @@ class TestChaos(TestChaosBase):
log.info("all pods are ready")
# reconnect if needed
sleep(constants.WAIT_PER_OP * 2)
cc.reconnect(connections, alias='default')
# cc.reconnect(connections, alias='default')
# reset counting again
cc.reset_counting(self.health_checkers)
# wait 50s (varies by feature)
sleep(constants.WAIT_PER_OP * 5)
# node info
querynode_id_pod_pair = get_querynode_info(release_name)
log.info(querynode_id_pod_pair)
sleep(120)
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
# assert statistic: all ops success again
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册