未验证 提交 33733868 编写于 作者: Z zhuwenxing 提交者: GitHub

[test]Add test case for multi replicase (#16668)

Signed-off-by: Nzhuwenxing <wenxing.zhu@zilliz.com>
上级 60f7fef3
......@@ -110,12 +110,12 @@ class ApiUtilityWrapper:
timeout=timeout, using=using).run()
return res, check_result
def load_balance(self, src_node_id, dst_node_ids, sealed_segment_ids, timeout=None,
def load_balance(self, collection_name, src_node_id, dst_node_ids, sealed_segment_ids, timeout=None,
using="default", check_task=None, check_items=None):
timeout = TIMEOUT if timeout is None else timeout
func_name = sys._getframe().f_code.co_name
res, is_succ = api_request([self.ut.load_balance, src_node_id, dst_node_ids,
res, is_succ = api_request([self.ut.load_balance, collection_name, src_node_id, dst_node_ids,
sealed_segment_ids, timeout, using])
check_result = ResponseChecker(res, func_name, check_task, check_items, is_succ,
timeout=timeout, using=using).run()
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
name: test-querynode-pod-failure
namespace: chaos-testing
- milvus-multi-querynode-querynode-bcdc595d9-7vmcj
- milvus-multi-querynode-querynode-bcdc595d9-ccxls
- milvus-multi-querynode-querynode-bcdc595d9-dpwgp
mode: all
action: pod-failure
duration: 2m
gracePeriod: 0
\ No newline at end of file
import threading
import pytest
import os
import time
import json
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker, InsertFlushChecker,
SearchChecker, QueryChecker, IndexChecker, Op)
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready, get_pod_list, get_pod_ip_name_pairs
from utils.util_common import findkeys
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from chaos import constants
from delayed_assert import expect, assert_expectations
def assert_statistic(checkers, expectations={}):
for k in checkers.keys():
# expect succ if no expectations
succ_rate = checkers[k].succ_rate()
total = checkers[k].total()
average_time = checkers[k].average_time
if expectations.get(k, '') == constants.FAIL:
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate < 0.49 or total < 2,
f"Expect Fail: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
expect(succ_rate > 0.90 and total > 2,
f"Expect Succ: {str(k)} succ rate {succ_rate}, total: {total}, average time: {average_time:.4f}")
def check_cluster_nodes(chaos_config):
# if all pods will be effected, the expect is all fail.
# Even though the replicas is greater than 1, it can not provide HA, so cluster_nodes is set as 1 for this situation.
if "all" in chaos_config["metadata"]["name"]:
return 1
selector = findkeys(chaos_config, "selector")
selector = list(selector)
log.info(f"chaos target selector: {selector}")
# assert len(selector) == 1
# chaos yaml file must place the effected pod selector in the first position
selector = selector[0]
namespace = selector["namespaces"][0]
labels_dict = selector["labelSelectors"]
labels_list = []
for k, v in labels_dict.items():
labels_str = ",".join(labels_list)
pods = get_pod_list(namespace, labels_str)
return len(pods)
def record_results(checkers):
res = ""
for k in checkers.keys():
check_result = checkers[k].check_result()
res += f"{str(k):10} {check_result}\n"
return res
class TestChaosBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
host = ''
port = 19530
_chaos_config = None
health_checkers = {}
def parser_testcase_config(self, chaos_yaml, chaos_config):
cluster_nodes = check_cluster_nodes(chaos_config)
tests_yaml = constants.TESTS_CONFIG_LOCATION + 'testcases.yaml'
tests_config = cc.gen_experiment_config(tests_yaml)
test_collections = tests_config.get('Collections', None)
for t in test_collections:
test_chaos = t.get('testcase', {}).get('chaos', {})
if test_chaos in chaos_yaml:
expects = t.get('testcase', {}).get(
'expectation', {}).get('cluster_1_node', {})
# for the cluster_n_node
if cluster_nodes > 1:
expects = t.get('testcase', {}).get(
'expectation', {}).get('cluster_n_node', {})
log.info(f"yaml.expects: {expects}")
self.expect_create = expects.get(
Op.create.value, constants.SUCC)
self.expect_insert = expects.get(
Op.insert.value, constants.SUCC)
self.expect_flush = expects.get(Op.flush.value, constants.SUCC)
self.expect_index = expects.get(Op.index.value, constants.SUCC)
self.expect_search = expects.get(
Op.search.value, constants.SUCC)
self.expect_query = expects.get(Op.query.value, constants.SUCC)
log.info(f"self.expects: create:{self.expect_create}, insert:{self.expect_insert}, "
f"flush:{self.expect_flush}, index:{self.expect_index}, "
f"search:{self.expect_search}, query:{self.expect_query}")
return True
return False
class TestChaos(TestChaosBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port):
connections.add_connection(default={"host": host, "port": port})
if connections.has_connection("default") is False:
raise Exception("no connections")
self.host = host
self.port = port
@pytest.fixture(scope="function", autouse=True)
def init_health_checkers(self):
checkers = {
Op.search: SearchChecker(replica_number=2),
Op.query: QueryChecker(replica_number=2)
self.health_checkers = checkers
def teardown(self):
chaos_res = CusResource(kind=self._chaos_config['kind'],
meta_name = self._chaos_config.get('metadata', None).get('name', None)
chaos_res.delete(meta_name, raise_ex=False)
log.info(f'Alive threads: {threading.enumerate()}')
# @pytest.mark.parametrize('chaos_yaml', "chaos/chaos_objects/template/pod-failure-by-pod-list.yaml")
def test_chaos(self):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
# log.info(f"chaos_yaml: {chaos_yaml}")
# get replicas info
release_name = "milvus-multi-querynode"
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
querynode_ip_pod_pair = get_pod_ip_name_pairs(
"chaos-testing", "app.kubernetes.io/instance=milvus-multi-querynode, component=querynode")
querynode_id_pod_pair = {}
ms = MilvusSys()
for node in ms.query_nodes:
ip = node["infos"]['hardware_infos']["ip"].split(":")[0]
] = querynode_ip_pod_pair[ip]
group_list = []
for g in replicas_info.groups:
# keep only one group in healthy status, other groups all are unhealthy by injecting chaos,
# In the effected groups, each group has one pod is in pod failure status
target_pod_list = []
for g in group_list[1:]:
pod = querynode_id_pod_pair[g[0]]
chaos_config = cc.gen_experiment_config("chaos/chaos_objects/template/pod-failure-by-pod-list.yaml")
chaos_config['metadata']['name'] = f"test-multi-replicase-{int(time.time())}"
kind = chaos_config['kind']
meta_name = chaos_config.get('metadata', None).get('name', None)
chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list
self._chaos_config = chaos_config # cache the chaos config for tear down
log.info(f"chaos_config: {chaos_config}")
# wait 20s
sleep(constants.WAIT_PER_OP * 2)
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
# assert statistic:all ops 100% succ
log.info("******1st assert before chaos: ")
# apply chaos object
chaos_res = CusResource(kind=chaos_config['kind'],
log.info("chaos injected")
sleep(constants.WAIT_PER_OP * 2)
# reset counting
# wait 120s
log.info(f'Alive threads: {threading.enumerate()}')
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
# assert statistic
log.info("******2nd assert after chaos injected: ")
Op.search: constants.SUCC,
Op.query: constants.SUCC
# delete chaos
log.info("chaos deleted")
# wait all pods ready
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}")
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}")
wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}")
log.info("all pods are ready")
# reconnect if needed
sleep(constants.WAIT_PER_OP * 2)
cc.reconnect(connections, alias='default')
# reset counting again
# wait 50s (varies by feature)
sleep(constants.WAIT_PER_OP * 5)
# replicas info
replicas_info, _ = self.health_checkers[Op.search].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.search].c_wrap.name}: {replicas_info}")
replicas_info, _ = self.health_checkers[Op.query].c_wrap.get_replicas()
log.info(f"replicas_info for collection {self.health_checkers[Op.query].c_wrap.name}: {replicas_info}")
# assert statistic: all ops success again
log.info("******3rd assert after chaos deleted: ")
# assert all expectations
log.info("*********************Chaos Test Completed**********************")
......@@ -1458,8 +1458,7 @@ class TestUtilityAdvanced(TestcaseBase):
cnt += r.num_rows
assert cnt == nb
@pytest.mark.skip(reason="wait for zhuwenxing to update")
def test_load_balance_normal(self):
target: test load balance of collection
......@@ -1490,7 +1489,7 @@ class TestUtilityAdvanced(TestcaseBase):
des_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(src_node_id, des_node_ids, sealed_segment_ids)
self.utility_wrap.load_balance(collection_w.name, src_node_id, des_node_ids, sealed_segment_ids)
# get segments distribution after load balance
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
......@@ -1500,7 +1499,6 @@ class TestUtilityAdvanced(TestcaseBase):
# assert sealed_segment_ids is subset of des_sealed_segment_ids
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
@pytest.mark.skip(reason="wait for zhuwenxing to update")
def test_load_balance_with_src_node_not_exist(self):
......@@ -1533,11 +1531,10 @@ class TestUtilityAdvanced(TestcaseBase):
dst_node_ids = all_querynodes[1:]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(invalid_src_node_id, dst_node_ids, sealed_segment_ids,
self.utility_wrap.load_balance(collection_w.name, invalid_src_node_id, dst_node_ids, sealed_segment_ids,
check_items={ct.err_code: 1, ct.err_msg: "is not exist to balance"})
@pytest.mark.skip(reason="wait for zhuwenxing to update")
def test_load_balance_with_all_dst_node_not_exist(self):
......@@ -1569,11 +1566,10 @@ class TestUtilityAdvanced(TestcaseBase):
dst_node_ids = [node["identifier"] for node in ms.index_nodes]
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(src_node_id, dst_node_ids, sealed_segment_ids,
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_items={ct.err_code: 1, ct.err_msg: "no available queryNode to allocate"})
@pytest.mark.skip(reason="wait for zhuwenxing to update")
def test_load_balance_with_one_sealed_segment_id_not_exist(self):
......@@ -1610,6 +1606,92 @@ class TestUtilityAdvanced(TestcaseBase):
sealed_segment_ids.append(max(segment_distribution[src_node_id]["sealed"]) + 1)
# load balance
self.utility_wrap.load_balance(src_node_id, dst_node_ids, sealed_segment_ids,
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_items={ct.err_code: 1, ct.err_msg: "is not exist"})
def test_load_balance_in_one_group(self):
target: test load balance of collection in one group
method: init a collection, load with multi replicas and load balance among the querynodes in one group
expected: load balance successfully
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
# get sealed segments
# get growing segments
# get replicas information
res, _ = collection_w.get_replicas()
# prepare load balance params
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
src_node_id = group_nodes[0]
dst_node_ids = group_nodes[1:]
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids)
# get segments distribution after load balance
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids_after_load_banalce = segment_distribution[src_node_id]["sealed"]
# assert
assert sealed_segment_ids_after_load_banalce == []
des_sealed_segment_ids = []
for des_node_id in dst_node_ids:
des_sealed_segment_ids += segment_distribution[des_node_id]["sealed"]
# assert sealed_segment_ids is subset of des_sealed_segment_ids
assert set(sealed_segment_ids).issubset(des_sealed_segment_ids)
def test_load_balance_not_in_one_group(self):
target: test load balance of collection in one group
method: init a collection, load with multi replicas and load balance among the querynodes in different group
expected: load balance failed
# init a collection
c_name = cf.gen_unique_str(prefix)
collection_w = self.init_collection_wrap(name=c_name)
ms = MilvusSys()
nb = 3000
df = cf.gen_default_dataframe_data(nb)
# get sealed segments
# get growing segments
# get replicas information
res, _ = collection_w.get_replicas()
# prepare load balance params
all_querynodes = [node["identifier"] for node in ms.query_nodes]
# find a group which has multi nodes
group_nodes = []
for g in res.groups:
if len(g.group_nodes) >= 2:
group_nodes = list(g.group_nodes)
src_node_id = group_nodes[0]
dst_node_ids = list(set(all_querynodes) - set(group_nodes))
res, _ = self.utility_wrap.get_query_segment_info(c_name)
segment_distribution = cf.get_segment_distribution(res)
sealed_segment_ids = segment_distribution[src_node_id]["sealed"]
# load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_items={ct.err_code: 1, ct.err_msg: "must be in the same replica group"})
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册