未验证 提交 60e3e661 编写于 作者: Z zhuwenxing 提交者: GitHub

Add standby test (#21795)

Signed-off-by: Nzhuwenxing <wenxing.zhu@zilliz.com>
上级 cae6d48f
apiVersion: chaos-mesh.org/v1alpha1
kind: PodChaos
metadata:
name: test-querynode-pod-failure
name: test-datacoord-pod-failure
namespace: chaos-testing
spec:
selector:
pods:
chaos-testing:
- milvus-multi-querynode-querynode-bcdc595d9-7vmcj
- milvus-multi-querynode-querynode-bcdc595d9-ccxls
- milvus-multi-querynode-querynode-bcdc595d9-dpwgp
- datacoord-standby-test-milvus-datacoord-b664b98df-c42d4
mode: all
action: pod-failure
duration: 2m
duration: 3m
gracePeriod: 0
\ No newline at end of file
......@@ -75,7 +75,7 @@ def exception_handler():
e_str = str(e)
log_e = e_str[0:log_row_length] + \
'......' if len(e_str) > log_row_length else e_str
log.error(log_e)
log.error(f"class: {self.__class__.__name__}, func name: {func.__name__}, error: {log_e}")
return Error(e), False
return inner_wrapper
return wrapper
......
......@@ -6,6 +6,27 @@ image:
tag: master-latest
pullPolicy: IfNotPresent
rootCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
queryCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
dataCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
indexCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
etcd:
replicaCount: 3
image:
......
......@@ -3,6 +3,7 @@ import pytest
def pytest_addoption(parser):
parser.addoption("--chaos_type", action="store", default="pod_kill", help="chaos_type")
parser.addoption("--role_type", action="store", default="activated", help="role_type")
parser.addoption("--target_component", action="store", default="querynode", help="target_component")
parser.addoption("--chaos_duration", action="store", default="1m", help="chaos_duration")
parser.addoption("--chaos_interval", action="store", default="10s", help="chaos_interval")
......@@ -15,6 +16,11 @@ def chaos_type(request):
return request.config.getoption("--chaos_type")
@pytest.fixture
def role_type(request):
return request.config.getoption("--role_type")
@pytest.fixture
def target_component(request):
return request.config.getoption("--target_component")
......
......@@ -12,7 +12,7 @@ CHAOS_GROUP = 'chaos-mesh.org' # chao mesh group
CHAOS_VERSION = 'v1alpha1' # chao mesh version
SUCC = 'succ'
FAIL = 'fail'
DELTA_PER_INS = 10 # entities per insert
DELTA_PER_INS = 300 # entities per insert
ENTITIES_FOR_SEARCH = 3000 # entities for search_collection
CHAOS_CONFIG_ENV = 'CHAOS_CONFIG_PATH' # env variables for chao path
......
import threading
import pytest
import time
from time import sleep
from pathlib import Path
from pymilvus import connections
from common.cus_resource_opts import CustomResourceOperations as CusResource
from common.milvus_sys import MilvusSys
import logging as log
from utils.util_k8s import wait_pods_ready, get_milvus_instance_name, get_milvus_deploy_tool, find_activate_standby_coord_pod
from utils.util_common import update_key_value, update_key_name, gen_experiment_config
import constants
class TestChaosApply:
@pytest.fixture(scope="function", autouse=True)
def init_env(self, host, port, user, password, milvus_ns):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
else:
connections.connect('default', host=host, port=port)
if connections.has_connection("default") is False:
raise Exception("no connections")
#
self.host = host
self.port = port
self.user = user
self.password = password
self.milvus_sys = MilvusSys(alias='default')
self.chaos_ns = constants.CHAOS_NAMESPACE
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
def reconnect(self):
if self.user and self.password:
connections.connect('default', host=self.host, port=self.port,
user=self.user,
password=self.password,
secure=True)
else:
connections.connect('default', host=self.host, port=self.port)
if connections.has_connection("default") is False:
raise Exception("no connections")
def teardown(self):
chaos_res = CusResource(kind=self.chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
meta_name = self.chaos_config.get('metadata', None).get('name', None)
chaos_res.delete(meta_name, raise_ex=False)
sleep(2)
def test_chaos_apply(self, chaos_type, role_type, target_component, chaos_duration, chaos_interval):
# start the monitor threads to check the milvus ops
log.info("*********************Chaos Test Start**********************")
log.info(connections.get_connection_addr('default'))
activate_pod_list, standby_pod_list = find_activate_standby_coord_pod(self.milvus_ns, self.release_name,
target_component)
log.info(f"activated pod list: {activate_pod_list}, standby pod list: {standby_pod_list}")
target_pod_list = activate_pod_list + standby_pod_list
if role_type == "standby":
target_pod_list = standby_pod_list
if role_type == "activated":
target_pod_list = activate_pod_list
chaos_type = chaos_type.replace("_", "-")
chaos_config = gen_experiment_config(f"{str(Path(__file__).absolute().parent)}/"
f"chaos_objects/template/{chaos_type}-by-pod-list.yaml")
chaos_config['metadata']['name'] = f"test-{target_component}-standby-{int(time.time())}"
meta_name = chaos_config.get('metadata', None).get('name', None)
chaos_config['spec']['selector']['pods']['chaos-testing'] = target_pod_list
self.chaos_config = chaos_config
# update chaos_duration from string to int with unit second
chaos_duration = chaos_duration.replace('h', '*3600+').replace('m', '*60+').replace('s', '*1+') + '+0'
chaos_duration = eval(chaos_duration)
if self.deploy_by == "milvus-operator":
update_key_name(chaos_config, "component", "app.kubernetes.io/component")
self._chaos_config = chaos_config # cache the chaos config for tear down
log.info(f"chaos_config: {chaos_config}")
# apply chaos object
chaos_res = CusResource(kind=chaos_config['kind'],
group=constants.CHAOS_GROUP,
version=constants.CHAOS_VERSION,
namespace=constants.CHAOS_NAMESPACE)
chaos_res.create(chaos_config)
log.info("chaos injected")
res = chaos_res.list_all()
chaos_list = [r['metadata']['name'] for r in res['items']]
assert meta_name in chaos_list
res = chaos_res.get(meta_name)
log.info(f"chaos inject result: {res['kind']}, {res['metadata']['name']}")
sleep(chaos_duration)
# delete chaos
chaos_res.delete(meta_name)
log.info("chaos deleted")
res = chaos_res.list_all()
chaos_list = [r['metadata']['name'] for r in res['items']]
# verify the chaos is deleted
sleep(10)
res = chaos_res.list_all()
chaos_list = [r['metadata']['name'] for r in res['items']]
assert meta_name not in chaos_list
# wait all pods ready
t0 = time.time()
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={meta_name}")
wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={meta_name}")
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={meta_name}")
wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={meta_name}")
log.info("all pods are ready")
pods_ready_time = time.time() - t0
log.info(f"pods ready time: {pods_ready_time}")
# reconnect to test the service healthy
start_time = time.time()
end_time = start_time + 120
while time.time() < end_time:
try:
self.reconnect()
break
except Exception as e:
log.error(e)
sleep(2)
recovery_time = time.time() - start_time
log.info(f"recovery time: {recovery_time}")
time.sleep(30)
activate_pod_list_after_chaos, standby_pod_list_after_chaos = find_activate_standby_coord_pod(self.milvus_ns, self.release_name,
target_component)
log.info(f"activated pod list: {activate_pod_list_after_chaos}, standby pod list: {standby_pod_list_after_chaos}")
if role_type == "standby":
# if the standby pod is injected, the activated pod should not be changed
assert activate_pod_list_after_chaos[0] == activate_pod_list[0]
if role_type == "activated":
# if the activated pod is injected, the one of standby pods should be changed to activated
assert activate_pod_list_after_chaos[0] in standby_pod_list
log.info("*********************Chaos Test Completed**********************")
......@@ -214,7 +214,7 @@ class TestChaos(TestChaosBase):
sleep(2)
# wait all pods ready
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label app.kubernetes.io/instance={release_name}")
ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE,f"app.kubernetes.io/instance={release_name}")
ready_1 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"app.kubernetes.io/instance={release_name}")
log.info(f"wait for pods in namespace {constants.CHAOS_NAMESPACE} with label release={release_name}")
ready_2 = wait_pods_ready(constants.CHAOS_NAMESPACE, f"release={release_name}")
if ready_1 and ready_2:
......
......@@ -69,6 +69,7 @@ class TestChaos(TestChaosBase):
Op.load_balance: LoadBalanceChecker()
}
self.health_checkers = checkers
ms = MilvusSys()
self.prepare_bulk_insert()
def prepare_bulk_insert(self, nb=30000, row_based=True):
......
......@@ -13,12 +13,13 @@ from utils.util_log import test_log as log
class TestGetCollections(TestcaseBase):
""" Test case of getting all collections """
@pytest.mark.tags(CaseLabel.L1)
@pytest.mark.tags(CaseLabel.L3)
def test_get_collections_by_prefix(self,):
self._connect()
all_collections = self.utility_wrap.list_collections()[0]
all_collections = [c_name for c_name in all_collections if "Checker" in c_name]
log.info(f"all_collections: {all_collections}")
selected_collections_map = {}
for c_name in all_collections:
prefix = c_name.split("_")[0]
......@@ -31,6 +32,7 @@ class TestGetCollections(TestcaseBase):
selected_collections = []
for value in selected_collections_map.values():
selected_collections.extend(value)
assert len(selected_collections) > 0
log.info(f"find {len(selected_collections)} collections:")
log.info(selected_collections)
data = {
......
......@@ -67,13 +67,13 @@ class TestOperations(TestBase):
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h","*3600+").replace("m","*60+").replace("s","")
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
for i in range(10):
sleep(request_duration//10)
for k,v in self.health_checkers.items():
for k, v in self.health_checkers.items():
v.check_result()
if is_check:
assert_statistic(self.health_checkers)
......
import pytest
import threading
from time import sleep
from pymilvus import connections
from chaos.checker import (CreateChecker,
InsertChecker,
FlushChecker,
SearchChecker,
QueryChecker,
IndexChecker,
DeleteChecker,
Op)
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
from common.common_type import CaseLabel
from common.milvus_sys import MilvusSys
from chaos.chaos_commons import assert_statistic
from chaos import constants
from delayed_assert import assert_expectations
from utils.util_k8s import (get_milvus_instance_name,
get_milvus_deploy_tool,
reset_healthy_checker_after_standby_activated)
class TestBase:
expect_create = constants.SUCC
expect_insert = constants.SUCC
expect_flush = constants.SUCC
expect_index = constants.SUCC
expect_search = constants.SUCC
expect_query = constants.SUCC
host = '127.0.0.1'
port = 19530
_chaos_config = None
health_checkers = {}
class TestOperations(TestBase):
@pytest.fixture(scope="function", autouse=True)
def connection(self, host, port, user, password, milvus_ns):
if user and password:
# log.info(f"connect to {host}:{port} with user {user} and password {password}")
connections.connect('default', host=host, port=port, user=user, password=password, secure=True)
else:
connections.connect('default', host=host, port=port)
if connections.has_connection("default") is False:
raise Exception("no connections")
log.info("connect to milvus successfully")
self.host = host
self.port = port
self.user = user
self.password = password
self.milvus_sys = MilvusSys(alias='default')
self.chaos_ns = constants.CHAOS_NAMESPACE
self.milvus_ns = milvus_ns
self.release_name = get_milvus_instance_name(self.milvus_ns, milvus_sys=self.milvus_sys)
self.deploy_by = get_milvus_deploy_tool(self.milvus_ns, self.milvus_sys)
def init_health_checkers(self, collection_name=None):
c_name = collection_name
checkers = {
Op.create: CreateChecker(collection_name=c_name),
Op.insert: InsertChecker(collection_name=c_name),
Op.flush: FlushChecker(collection_name=c_name),
Op.index: IndexChecker(collection_name=c_name),
Op.search: SearchChecker(collection_name=c_name),
Op.query: QueryChecker(collection_name=c_name),
Op.delete: DeleteChecker(collection_name=c_name),
}
self.health_checkers = checkers
@pytest.mark.tags(CaseLabel.L3)
def test_operations(self, request_duration, target_component, is_check):
# start the monitor threads to check the milvus ops
log.info("*********************Test Start**********************")
log.info(connections.get_connection_addr('default'))
c_name = None
self.init_health_checkers(collection_name=c_name)
cc.start_monitor_threads(self.health_checkers)
log.info("*********************Load Start**********************")
# wait request_duration
request_duration = request_duration.replace("h", "*3600+").replace("m", "*60+").replace("s", "")
if request_duration[-1] == "+":
request_duration = request_duration[:-1]
request_duration = eval(request_duration)
# start a thread to reset health_checkers when standby is activated.
t = threading.Thread(target=reset_healthy_checker_after_standby_activated,
args=(self.milvus_ns, self.release_name, target_component, self.health_checkers),
kwargs={"timeout": request_duration//2},
daemon=True)
t.start()
# t.join()
log.info('start a thread to reset health_checkers when standby is activated')
for i in range(10):
sleep(request_duration//10)
for k, v in self.health_checkers.items():
v.check_result()
if is_check:
assert_statistic(self.health_checkers)
assert_expectations()
log.info("*********************Chaos Test Completed**********************")
......@@ -41,4 +41,7 @@ minio==7.1.5
h5py==3.1.0
# for log
loguru==0.5.3
\ No newline at end of file
loguru==0.5.3
# for standby test
etcd3==0.12.0
\ No newline at end of file
cluster:
enabled: true
image:
all:
repository: milvusdb/milvus
tag: 2.2.0-latest
pullPolicy: IfNotPresent
rootCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
queryCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
dataCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
indexCoordinator:
replicas: 2
activeStandby:
enabled: true # Enable active-standby when you set multiple replicas for root coordinator
etcd:
replicaCount: 3
image:
repository: milvusdb/etcd
tag: 3.5.0-r7
minio:
resources:
requests:
memory: 256Mi
kafka:
enabled: false
name: kafka
replicaCount: 3
defaultReplicationFactor: 2
pulsar:
enabled: true
extra:
bastion: no
wsproxy: no
autorecovery:
resources:
requests:
cpu: 0.1
memory: 256Mi
proxy:
replicaCount: 1
resources:
requests:
cpu: 0.1
memory: 256Mi
wsResources:
requests:
memory: 256Mi
cpu: 0.1
configData:
PULSAR_MEM: >
-Xms256m -Xmx256m
PULSAR_GC: >
-XX:MaxDirectMemorySize=512m
httpNumThreads: "50"
bookkeeper:
replicaCount: 2
resources:
requests:
cpu: 0.1
memory: 512Mi
configData:
PULSAR_MEM: >
-Xms512m
-Xmx512m
-XX:MaxDirectMemorySize=1024m
PULSAR_GC: >
-Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024
-XX:+UseG1GC -XX:MaxGCPauseMillis=10
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis
-XX:ParallelGCThreads=32
-XX:ConcGCThreads=32
-XX:G1NewSizePercent=50
-XX:+DisableExplicitGC
-XX:-ResizePLAB
-XX:+ExitOnOutOfMemoryError
-XX:+PerfDisableSharedMem
-XX:+PrintGCDetails
nettyMaxFrameSizeBytes: "104867840"
zookeeper:
replicaCount: 1
resources:
requests:
cpu: 0.1
memory: 256Mi
configData:
PULSAR_MEM: >
-Xms512m
-Xmx512m
PULSAR_GC: >
-Dcom.sun.management.jmxremote
-Djute.maxbuffer=10485760
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis
-XX:+DisableExplicitGC
-XX:+PerfDisableSharedMem
-Dzookeeper.forceSync=no
broker:
replicaCount: 1
resources:
requests:
cpu: 0.1
memory: 512Mi
configData:
PULSAR_MEM: >
-Xms512m
-Xmx512m
-XX:MaxDirectMemorySize=1024m
PULSAR_GC: >
-Dio.netty.leakDetectionLevel=disabled
-Dio.netty.recycler.linkCapacity=1024
-XX:+ParallelRefProcEnabled
-XX:+UnlockExperimentalVMOptions
-XX:+DoEscapeAnalysis
-XX:ParallelGCThreads=32
-XX:ConcGCThreads=32
-XX:G1NewSizePercent=50
-XX:+DisableExplicitGC
-XX:-ResizePLAB
-XX:+ExitOnOutOfMemoryError
maxMessageSize: "104857600"
defaultRetentionTimeInMinutes: "10080"
defaultRetentionSizeInMB: "8192"
backlogQuotaDefaultLimitGB: "8"
backlogQuotaDefaultRetentionPolicy: producer_exception
release=${1:-"milvs-chaos"}
milvus_mode=${2:-"cluster"}
ns=${3:-"chaos-testing"}
bash uninstall_milvus.sh ${release} ${ns}|| true
helm repo add milvus https://milvus-io.github.io/milvus-helm/
helm repo update
if [[ ${milvus_mode} == "cluster" ]];
then
helm install --wait --timeout 360s ${release} milvus/milvus -f ../cluster-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
fi
if [[ ${milvus_mode} == "standalone" ]];
then
helm install --wait --timeout 360s ${release} milvus/milvus -f ../standalone-values.yaml --set metrics.serviceMonitor.enabled=true -n=${ns}
fi
#!/bin/bash
set -e
release=${1:-"milvs-chaos"}
ns=${2:-"chaos-testing"}
bash uninstall_milvus.sh ${release} ${ns}|| true
echo "insatll cluster"
helm install --wait --debug --timeout 600s ${RELEASE_NAME:-$release} milvus/milvus \
--set image.all.repository=${REPOSITORY:-"milvusdb/milvus"} \
--set image.all.tag=${IMAGE_TAG:-"master-latest"} \
--set metrics.serviceMonitor.enabled=true \
-f ../cluster-values.yaml -n=${ns}
\ No newline at end of file
#!/bin/bash
set -e
release=${1:-"milvs-chaos"}
ns=${2:-"chaos-testing"}
bash uninstall_milvus.sh ${release} ${ns}|| true
echo "insatll standalone"
helm install --wait --debug --timeout 600s ${RELEASE_NAME:-$release} milvus/milvus \
--set image.all.repository=${REPOSITORY:-"milvusdb/milvus"} \
--set image.all.tag=${IMAGE_TAG:-"master-latest"} \
--set metrics.serviceMonitor.enabled=true \
-f ../standalone-values.yaml -n=${ns}
\ No newline at end of file
# Exit immediately for non zero status
set -e
release=${1:-"milvus-chaos"}
ns=${2:-"chaos-testing"}
helm uninstall ${release} -n=${ns}
kubectl delete pvc -l release=${release} -n=${ns}
kubectl delete pvc -l app.kubernetes.io/instance=${release} -n=${ns}
from yaml import full_load
import json
import requests
import unittest
from utils.util_log import test_log as log
def gen_experiment_config(yaml):
......@@ -24,6 +26,20 @@ def findkeys(node, kv):
yield x
def find_value_by_key(node, k):
# refer to https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
if isinstance(node, list):
for i in node:
for x in find_value_by_key(i, k):
yield x
elif isinstance(node, dict):
if k in node:
yield node[k]
for j in node.values():
for x in find_value_by_key(j, k):
yield x
def update_key_value(node, modify_k, modify_v):
# update the value of modify_k to modify_v
if isinstance(node, list):
......@@ -63,18 +79,128 @@ def get_collections():
return collections
if __name__ == "__main__":
d = { "id" : "abcde",
"key1" : "blah",
"key2" : "blah blah",
"nestedlist" : [
{ "id" : "qwerty",
"nestednestedlist" : [
{ "id" : "xyz", "keyA" : "blah blah blah" },
{ "id" : "fghi", "keyZ" : "blah blah blah" }],
"anothernestednestedlist" : [
{ "id" : "asdf", "keyQ" : "blah blah" },
{ "id" : "yuiop", "keyW" : "blah" }] } ] }
print(list(findkeys(d, 'id')))
update_key_value(d, "none_id", "ccc")
print(d)
def get_request_success_rate(url, api_key, body):
headers = {
'Authorization': "Bearer " + api_key,
'Content-Type': 'application/json'
}
rsp = requests.post(url, headers=headers, data=json.dumps(body))
result = {}
if rsp.status_code == 200:
results = rsp.json()["results"]
frames = results["A"]["frames"]
for frame in frames:
schema = frame["schema"]
function_name = list(find_value_by_key(schema, "function_name"))[0]
data = frame["data"]["values"] # get the success rate value
result[function_name] = data
else:
log.error(f"Failed to get request success rate with status code {rsp.status_code}")
return result
def analyze_service_breakdown_time(result, chaos_ts, recovery_rate):
# analyze the service breakdown time
service_breakdown_time = {}
for service, value in result.items():
ts = value[0]
success_rate = value[1]
chaos_inject_point = 0
failed_point = 0
failed_ts = ts[0]
recovery_ts = ts[0]
for i, t in enumerate(ts):
if t > chaos_ts:
chaos_inject_point = i - 1
break
if t == chaos_ts:
chaos_inject_point = i
break
previous_rate = sum(success_rate[:chaos_inject_point+1]) / (chaos_inject_point+1)
for i in range(chaos_inject_point, len(ts)):
if success_rate[i] < recovery_rate * previous_rate:
failed_point = i
failed_ts = ts[i]
break
for i in range(failed_point, len(ts)):
if success_rate[i] >= recovery_rate * previous_rate:
recovery_ts = ts[i]
break
else:
# if the service is still down,
# set the recovery time to the last timestamp with another interval
recovery_ts = ts[-1] + (ts[-1] - ts[-2])
breakdown_time = recovery_ts - failed_ts
log.info(f"Service {service} breakdown time is {breakdown_time}")
service_breakdown_time[service] = breakdown_time
return service_breakdown_time
class TestUtilCommon(unittest.TestCase):
def test_find_value_by_key(self):
test_dict = {"id": "abcde",
"key1": "blah",
"key2": "blah blah",
"nestedlist": [
{"id": "qwerty",
"nestednestedlist": [
{"id": "xyz", "keyA": "blah blah blah"},
{"id": "fghi", "keyZ": "blah blah blah"}],
"anothernestednestedlist": [
{"id": "asdf", "keyQ": "blah blah"},
{"id": "yuiop", "keyW": "blah"}]}]}
self.assertEqual(list(find_value_by_key(test_dict, "id")),
['abcde', 'qwerty', 'xyz', 'fghi', 'asdf', 'yuiop'])
def test_analyze_service_breakdown_time(self):
result = {
"service1": [[1, 2, 3, 4, 5], [1, 0, 0, 0, 1]],
"service2": [[1, 2, 3, 4, 5], [1, 1, 0, 0, 1]],
"service3": [[1, 2, 3, 4, 5], [1, 1, 1, 0, 1]],
"service4": [[1, 2, 3, 4, 5], [1, 1, 1, 1, 0]],
}
chaos_ts = 2
recovery_rate = 0.8
service_breakdown_time = analyze_service_breakdown_time(result, chaos_ts, recovery_rate)
self.assertEqual(service_breakdown_time, {"service1": 3, "service2": 2, "service3": 1, "service4": 1})
def test_get_request_success_rate(self):
url = "https://xxx/api/ds/query"
api_key = "xxx"
body = {
"queries": [
{
"datasource": {
"type": "prometheus",
"uid": "P1809F7CD0C75ACF3"
},
"exemplar": True,
"expr": "sum(increase(milvus_proxy_req_count{app_kubernetes_io_instance=~\"datacoord-standby-test\", app_kubernetes_io_name=\"milvus\", namespace=\"chaos-testing\", status=\"success\"}[2m])/120) by(function_name, pod, node_id)",
"interval": "",
"legendFormat": "{{function_name}}-{{pod}}-{{node_id}}",
"queryType": "timeSeriesQuery",
"refId": "A",
"requestId": "123329A",
"utcOffsetSec": 28800,
"datasourceId": 1,
"intervalMs": 15000,
"maxDataPoints": 1070
}
],
"range": {
"from": "2023-01-06T04:24:48.549Z",
"to": "2023-01-06T07:24:48.549Z",
"raw": {
"from": "now-3h",
"to": "now"
}
},
"from": "1672979088549",
"to": "1672989888549"
}
rsp = get_request_success_rate(url, api_key, body)
self.assertEqual(isinstance(rsp, dict), True)
import json
import os.path
import time
import threading
import requests
import etcd3
from pymilvus import connections
from kubernetes import client, config
from kubernetes.client.rest import ApiException
from common.milvus_sys import MilvusSys
from utils.util_log import test_log as log
from chaos import chaos_commons as cc
from utils.util_common import find_value_by_key
from common.common_type import in_cluster_env
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def init_k8s_client_config():
"""
......@@ -316,9 +322,79 @@ def get_metrics_querynode_sq_req_count():
raise Exception(-1, f"Failed to get metrics with status code {response.status_code}")
def get_pod_logs(namespace, pod_name):
init_k8s_client_config()
api_instance = client.CoreV1Api()
log.debug(f'Start to read {pod_name} log')
logs = api_instance.read_namespaced_pod_log(name=pod_name, namespace=namespace, async_req=True)
return logs
def find_activate_standby_coord_pod(namespace, release_name, coord_type):
init_k8s_client_config()
api_instance = client.CoreV1Api()
etcd_service_name = release_name + "-etcd"
service = api_instance.read_namespaced_service(name=etcd_service_name, namespace=namespace)
etcd_cluster_ip = service.spec.cluster_ip
etcd_port = service.spec.ports[0].port
etcd = etcd3.client(host=etcd_cluster_ip, port=etcd_port)
v = etcd.get(f'by-dev/meta/session/{coord_type}')
log.info(f"coord_type: {coord_type}, etcd session value: {v}")
activated_pod_ip = json.loads(v[0])["Address"].split(":")[0]
label_selector = f'app.kubernetes.io/instance={release_name}, component={coord_type}'
items = get_pod_list(namespace, label_selector=label_selector)
all_pod_list = []
for item in items:
pod_name = item.metadata.name
all_pod_list.append(pod_name)
activate_pod_list = []
standby_pod_list = []
for item in items:
pod_name = item.metadata.name
ip = item.status.pod_ip
if ip == activated_pod_ip:
activate_pod_list.append(pod_name)
standby_pod_list = list(set(all_pod_list) - set(activate_pod_list))
return activate_pod_list, standby_pod_list
def reset_healthy_checker_after_standby_activated(namespace, release_name, coord_type, health_checkers, timeout=360):
activate_pod_list_before, standby_pod_list_before = find_activate_standby_coord_pod(namespace, release_name, coord_type)
log.info(f"check standby switch: activate_pod_list_before {activate_pod_list_before}, "
f"standby_pod_list_before {standby_pod_list_before}")
standby_activated = False
start_time = time.time()
end_time = time.time()
while not standby_activated and end_time - start_time < timeout:
try:
activate_pod_list_after, standby_pod_list_after = find_activate_standby_coord_pod(namespace, release_name, coord_type)
if activate_pod_list_after[0] in standby_pod_list_before:
standby_activated = True
log.info(f"Standby {coord_type} pod {activate_pod_list_after[0]} activated")
log.info(f"check standby switch: activate_pod_list_after {activate_pod_list_after}, "
f"standby_pod_list_after {standby_pod_list_after}")
break
except Exception as e:
log.error(f"Exception when check standby switch: {e}")
time.sleep(10)
end_time = time.time()
if standby_activated:
time.sleep(30)
cc.reset_counting(health_checkers)
for k, v in health_checkers.items():
log.info("reset health checkers")
v.check_result()
else:
log.info(f"Standby {coord_type} pod does not switch standby mode")
if __name__ == '__main__':
label = "app.kubernetes.io/name=milvus, component=querynode"
instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")
res = get_pod_list("chaos-testing", label_selector=label)
m = get_pod_ip_name_pairs("chaos-testing", label_selector=label)
export_pod_logs(namespace='chaos-testing', label_selector=label)
# for coord in ["indexcoord"]:
for coord in ["rootcoord", "datacoord", "indexcoord", "querycoord"]:
activate_pod_list, standby_pod_list = find_activate_standby_coord_pod("chaos-testing", "rootcoord-standby-test", coord)
print(f"activate pod {activate_pod_list}, standby pod {standby_pod_list}")
# label = "app.kubernetes.io/name=milvus, component=querynode"
# instance_name = get_milvus_instance_name("chaos-testing", "10.96.250.111")
# res = get_pod_list("chaos-testing", label_selector=label)
# m = get_pod_ip_name_pairs("chaos-testing", label_selector=label)
# export_pod_logs(namespace='chaos-testing', label_selector=label)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册