diff --git a/tests/python_client/chaos/conftest.py b/tests/python_client/chaos/conftest.py index 275412de8db3a5f89e61ab3d69ef882281031168..9e1a5572888fb7bb5416d508dd46d217a1a63019 100644 --- a/tests/python_client/chaos/conftest.py +++ b/tests/python_client/chaos/conftest.py @@ -4,6 +4,7 @@ import pytest def pytest_addoption(parser): parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type") parser.addoption("--target_component", action="store", default="querynode", help="target_component") + parser.addoption("--target_pod", action="store", default="etcd_leader", help="target_pod") parser.addoption("--target_number", action="store", default="1", help="target_number") parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration") parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval") @@ -21,6 +22,11 @@ def target_component(request): return request.config.getoption("--target_component") +@pytest.fixture +def target_pod(request): + return request.config.getoption("--target_pod") + + @pytest.fixture def target_number(request): return request.config.getoption("--target_number") diff --git a/tests/python_client/chaos/test_chaos_apply.py b/tests/python_client/chaos/test_chaos_apply.py index 23d7f2c6c83123d35a80fbb201af7425599642cb..87218ab4c3dee97ca9944ac1beb83535dabe6230 100644 --- a/tests/python_client/chaos/test_chaos_apply.py +++ b/tests/python_client/chaos/test_chaos_apply.py @@ -107,10 +107,10 @@ class TestChaosApply: assert meta_name not in chaos_list # wait all pods ready t0 = time.time() - log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}") - wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}") - log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}") - wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={release_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}") log.info("all pods are ready") pods_ready_time = time.time() - t0 log.info(f"pods ready time: {pods_ready_time}") diff --git a/tests/python_client/chaos/test_chaos_apply_to_determined_pod.py b/tests/python_client/chaos/test_chaos_apply_to_determined_pod.py new file mode 100644 index 0000000000000000000000000000000000000000..4f133d1dc8ab620bbf398b5f59c0e9163186e537 --- /dev/null +++ b/tests/python_client/chaos/test_chaos_apply_to_determined_pod.py @@ -0,0 +1,128 @@ +import pytest +import time +from time import sleep +from pathlib import Path +from pymilvus import connections +from common.cus_resource_opts import CustomResourceOperations as CusResource +from common.milvus_sys import MilvusSys +from chaos import chaos_commons as cc +import logging as log +from utils.util_k8s import (wait_pods_ready, get_milvus_instance_name, + get_milvus_deploy_tool, get_etcd_leader, get_etcd_followers) +import constants + + +class TestChaosApply: + + @pytest.fixture(scope="function", autouse=True) + def init_env(self, host, port, user, password, milvus_ns): + if user and password: + # log.info(f"connect to {host}:{port} with user {user} and password {password}") + connections.connect('default', host=host, port=port, user=user, password=password, secure=True) + else: + connections.connect('default', host=host, port=port) + if connections.has_connection("default") is False: + raise Exception("no connections") + # + self.host = host + self.port = port + self.user = user + self.password = password + self.milvus_sys = MilvusSys(alias='default') + self.chaos_ns = constants.CHAOS_NAMESPACE + self.milvus_ns = milvus_ns + self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys) + self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + + def reconnect(self): + if self.user and self.password: + connections.connect('default', host=self.host, port=self.port, + user=self.user, + password=self.password, + secure=True) + else: + connections.connect('default', host=self.host, port=self.port) + if connections.has_connection("default") is False: + raise Exception("no connections") + + def teardown(self): + chaos_res = CusResource(kind=self.chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + meta_name = self.chaos_config.get('metadata', None).get('name', None) + chaos_res.delete(meta_name, raise_ex=False) + sleep(2) + + def test_chaos_apply(self, chaos_type, target_pod, chaos_duration, chaos_interval): + # start the monitor threads to check the milvus ops + log.info("*********************Chaos Test Start**********************") + log.info(connections.get_connection_addr('default')) + release_name = self.release_name + deploy_tool = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys) + target_pod_list = [] + if target_pod == 'etcd_leader': + etcd_leader = get_etcd_leader(release_name, deploy_tool) + if etcd_leader is None: + raise Exception("no etcd leader") + target_pod_list.append(etcd_leader) + if target_pod == 'etcd_followers': + etcd_followers = get_etcd_followers(release_name, deploy_tool) + if etcd_followers is None: + raise Exception("no etcd followers") + target_pod_list.extend(etcd_followers) + log.info(f"target_pod_list: {target_pod_list}") + chaos_type = chaos_type.replace('_', '-') + chaos_config = cc.gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/chaos_objects/template/{chaos_type}-by-pod-list.yaml") + chaos_config['metadata']['name'] = f"test-{target_pod.replace('_', '-')}-{chaos_type}-{int(time.time())}" + meta_name = chaos_config.get('metadata', None).get('name', None) + chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list + self.chaos_config = chaos_config # cache the chaos config for tear down # cache the chaos config for tear down + log.info(f"chaos_config: {chaos_config}") + # apply chaos object + chaos_res = CusResource(kind=chaos_config['kind'], + group=constants.CHAOS_GROUP, + version=constants.CHAOS_VERSION, + namespace=constants.CHAOS_NAMESPACE) + chaos_res.create(chaos_config) + log.info("chaos injected") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name in chaos_list + res = chaos_res.get(meta_name) + log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}") + chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0' + chaos_duration = eval(chaos_duration) + sleep(chaos_duration) + # delete chaos + chaos_res.delete(meta_name) + log.info("chaos deleted") + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + # verify the chaos is deleted + sleep(10) + res = chaos_res.list_all() + chaos_list = [r['metadata']['name'] for r in res['items']] + assert meta_name not in chaos_list + # wait all pods ready + t0 = time.time() + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={release_name}") + log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}") + wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}") + log.info("all pods are ready") + pods_ready_time = time.time() - t0 + log.info(f"pods ready time: {pods_ready_time}") + # reconnect to test the service healthy + start_time = time.time() + end_time = start_time + 120 + while time.time() < end_time: + try: + self.reconnect() + break + except Exception as e: + log.error(e) + sleep(2) + recovery_time = time.time() - start_time + log.info(f"recovery time: {recovery_time}") + log.info("*********************Chaos Test Completed**********************") diff --git a/tests/python_client/utils/util_k8s.py b/tests/python_client/utils/util_k8s.py index 4a651a9693e00bf52304149eae42a35cb9164242..1fe8b564621130598e1b5412a39c56a86038e286 100644 --- a/tests/python_client/utils/util_k8s.py +++ b/tests/python_client/utils/util_k8s.py @@ -315,6 +315,83 @@ def get_metrics_querynode_sq_req_count(): else: raise Exception(-1, f"Failed to get metrics with status code {response.status_code}") +def get_svc_ip(namespace, label_selector): + """ get svc ip from svc list """ + init_k8s_client_config() + api_instance = client.CoreV1Api() + try: + api_response = api_instance.list_namespaced_service(namespace=namespace, label_selector=label_selector) + except ApiException as e: + log.error("Exception when calling CoreV1Api->list_namespaced_service: %s\n" % e) + raise Exception(str(e)) + svc_ip = api_response.items[0].spec.cluster_ip + return svc_ip + + +def parse_etcdctl_table_output(output): + """ parse etcdctl table output """ + output = output.split("\n") + title = [] + data = [] + for line in output: + if "ENDPOINT" in line: + title = [x.strip(" ") for x in line.strip("|").split("|")] + if ":" in line: + data.append([x.strip(" ") for x in line.strip("|").split("|")]) + return title, data + + +def get_etcd_leader(release_name, deploy_tool="helm"): + """ get etcd leader by etcdctl """ + pod_list = [] + if deploy_tool == "helm": + label_selector = f"app.kubernetes.io/instance={release_name}-etcd, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + if len(pod_list) == 0: + label_selector = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + if deploy_tool == "operator": + label_selector = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + leader = None + for pod in pod_list: + endpoint = f"{pod.status.pod_ip}:2379" + cmd = f"etcdctl --endpoints={endpoint} endpoint status -w table" + output = os.popen(cmd).read() + log.info(f"etcdctl output: {output}") + title, data = parse_etcdctl_table_output(output) + idx = title.index("IS LEADER") + if data[0][idx] == "true": + leader = pod.metadata.name + log.info(f"etcd leader is {leader}") + return leader + + +def get_etcd_followers(release_name, deploy_tool="helm"): + """ get etcd follower by etcdctl """ + pod_list = [] + if deploy_tool == "helm": + label_selector = f"app.kubernetes.io/instance={release_name}-etcd, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + if len(pod_list) == 0: + label_selector = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + if deploy_tool == "operator": + label_selector = f"app.kubernetes.io/instance={release_name}, app.kubernetes.io/name=etcd" + pod_list = get_pod_list("chaos-testing", label_selector) + followers = [] + for pod in pod_list: + endpoint = f"{pod.status.pod_ip}:2379" + cmd = f"etcdctl --endpoints={endpoint} endpoint status -w table" + output = os.popen(cmd).read() + log.info(f"etcdctl output: {output}") + title, data = parse_etcdctl_table_output(output) + idx = title.index("IS LEADER") + if data[0][idx] == "false": + followers.append(pod.metadata.name) + log.info(f"etcd followers are {followers}") + return followers + if __name__ == '__main__': label = "app.kubernetes.io/name=milvus, component=querynode"