未验证 提交 2abc3b32 编写于 作者: T ThreadDao 提交者: GitHub

[skii e2e] Add scale test pipeline server logs archive (#15234)

Signed-off-by: NThreadDao <yufen.zong@zilliz.com>
上级 1b1d4e50
......@@ -31,6 +31,7 @@ pipeline {
TEST_TYPE = "scale-test"
// SEMVER = "${BRANCH_NAME.contains('/') ? BRANCH_NAME.substring(BRANCH_NAME.lastIndexOf('/') + 1) : BRANCH_NAME}"
ARTIFACTS = "${env.WORKSPACE}/_artifacts"
MILVUS_LOGS = "/tmp/milvus_logs/*"
}
stages {
......@@ -77,7 +78,16 @@ pipeline {
script {
dir("${env.ARTIFACTS}") {
sh "tar -zcvf artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz /tmp/ci_logs --remove-files || true"
archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz ", allowEmptyArchive: true }
archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-pytest-logs.tar.gz ", allowEmptyArchive: true
DIR_LIST = sh(returnStdout: true, script: 'ls -d1 ${MILVUS_LOGS}').trim()
for (d in DIR_LIST.tokenize("\n")) {
sh "echo $d"
def release_name = d.split('/')[-1]
sh "tar -zcvf artifacts-${PROJECT_NAME}-${TEST_TYPE}-${release_name}-logs.tar.gz ${d} --remove-files || true"
archiveArtifacts artifacts: "artifacts-${PROJECT_NAME}-${TEST_TYPE}-${release_name}-logs.tar.gz ", allowEmptyArchive: true
}
}
}
}
}
......
......@@ -10,7 +10,7 @@ from customize.milvus_operator import MilvusOperator
from scale import constants
from pymilvus import connections
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready
from utils.util_k8s import wait_pods_ready, export_pod_logs
from utils.util_pymilvus import get_latest_tag
prefix = "data_scale"
......@@ -55,52 +55,61 @@ class TestDataNodeScale:
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
# host = '10.98.0.4'
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
try:
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
# create
c_name = cf.gen_unique_str("scale_query")
# c_name = 'scale_query_DymS7kI4'
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5)
# create
c_name = cf.gen_unique_str("scale_query")
# c_name = 'scale_query_DymS7kI4'
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=5)
tmp_nb = 10000
tmp_nb = 10000
def do_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(tmp_df)
log.debug(collection_w.num_entities)
def do_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
collection_w.insert(tmp_df)
log.debug(collection_w.num_entities)
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
t_insert.start()
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
t_insert.start()
# scale dataNode to 5
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE)
time.sleep(300)
log.debug("Expand dataNode test finished")
# scale dataNode to 5
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 5}, constants.NAMESPACE)
time.sleep(300)
log.debug("Expand dataNode test finished")
# create new collection and insert
new_c_name = cf.gen_unique_str("scale_query")
collection_w_new = ApiCollectionWrapper()
collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
# create new collection and insert
new_c_name = cf.gen_unique_str("scale_query")
collection_w_new = ApiCollectionWrapper()
collection_w_new.init_collection(name=new_c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
def do_new_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
collection_w_new.insert(tmp_df)
log.debug(collection_w_new.num_entities)
def do_new_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(tmp_nb)
collection_w_new.insert(tmp_df)
log.debug(collection_w_new.num_entities)
t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True)
t_insert_new.start()
t_insert_new = threading.Thread(target=do_new_insert, args=(), daemon=True)
t_insert_new.start()
# scale dataNode to 3
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# scale dataNode to 3
mic.upgrade(release_name, {'spec.components.dataNode.replicas': 3}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
log.debug(collection_w.num_entities)
time.sleep(300)
log.debug("Shrink dataNode test finished")
log.debug(collection_w.num_entities)
time.sleep(300)
log.debug("Shrink dataNode test finished")
mic.uninstall(release_name, namespace=constants.NAMESPACE)
except Exception as e:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
......@@ -10,6 +10,7 @@ from customize.milvus_operator import MilvusOperator
from scale import constants
from common import common_func as cf
from common import common_type as ct
from utils.util_k8s import export_pod_logs
from utils.util_log import test_log as log
from utils.util_pymilvus import get_latest_tag
......@@ -97,6 +98,9 @@ class TestIndexNodeScale:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
@pytest.mark.tags(CaseLabel.L3)
......@@ -175,4 +179,7 @@ class TestIndexNodeScale:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
......@@ -6,7 +6,7 @@ from common import common_func as cf
from common.common_type import CaseLabel
from scale import scale_common as sc, constants
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready
from utils.util_k8s import wait_pods_ready, export_pod_logs
from utils.util_pymilvus import get_latest_tag
prefix = "proxy_scale"
......@@ -56,22 +56,30 @@ class TestProxyScale:
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
# host = "10.98.0.7"
c_name = cf.gen_unique_str(prefix)
self.e2e_milvus_parallel(5, host, c_name)
log.info('Milvus test before expand')
try:
c_name = cf.gen_unique_str(prefix)
self.e2e_milvus_parallel(5, host, c_name)
log.info('Milvus test before expand')
# expand proxy replicas from 1 to 5
mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# expand proxy replicas from 1 to 5
mic.upgrade(release_name, {'spec.components.proxy.replicas': 5}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
self.e2e_milvus_parallel(5, host, c_name)
log.info('Milvus test after expand')
self.e2e_milvus_parallel(5, host, c_name)
log.info('Milvus test after expand')
# expand proxy replicas from 5 to 2
mic.upgrade(release_name, {'spec.components.proxy.replicas': 2}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# expand proxy replicas from 5 to 2
mic.upgrade(release_name, {'spec.components.proxy.replicas': 2}, constants.NAMESPACE)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
self.e2e_milvus_parallel(2, host, c_name)
log.info('Milvus test after shrink')
self.e2e_milvus_parallel(2, host, c_name)
log.info('Milvus test after shrink')
mic.uninstall(release_name, namespace=constants.NAMESPACE)
except Exception as e:
raise Exception(str(e))
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
......@@ -11,7 +11,7 @@ from common import common_type as ct
from scale import constants
from pymilvus import Index, connections
from utils.util_log import test_log as log
from utils.util_k8s import wait_pods_ready
from utils.util_k8s import wait_pods_ready, export_pod_logs
from utils.util_pymilvus import get_latest_tag
prefix = "search_scale"
......@@ -55,70 +55,78 @@ class TestQueryNodeScale:
host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0]
# host = "10.98.0.8"
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
try:
# connect
connections.add_connection(default={"host": host, "port": 19530})
connections.connect(alias='default')
# create
c_name = cf.gen_unique_str("scale_query")
# c_name = 'scale_query_DymS7kI4'
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
# insert two segments
for i in range(3):
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
log.debug(collection_w.num_entities)
# create index
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
assert collection_w.has_index()[0]
assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
default_index_params)
# load
collection_w.load()
# scale queryNode to 5
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE)
# continuously search
def do_search():
while True:
search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
log.debug(search_res[0].ids)
assert len(search_res[0].ids) == ct.default_limit
t_search = threading.Thread(target=do_search, args=(), daemon=True)
t_search.start()
# wait new QN running, continuously insert
# time.sleep(10)
healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200)
log.info(f"milvus healthy after scale up: {healthy}")
# wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
def do_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(1000)
collection_w.insert(tmp_df)
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
t_insert.start()
# create
c_name = cf.gen_unique_str("scale_query")
# c_name = 'scale_query_DymS7kI4'
collection_w = ApiCollectionWrapper()
collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2)
# insert two segments
for i in range(3):
df = cf.gen_default_dataframe_data(nb)
collection_w.insert(df)
log.debug(collection_w.num_entities)
time.sleep(20)
log.debug("Expand querynode test finished")
# create index
collection_w.create_index(ct.default_float_vec_field_name, default_index_params)
assert collection_w.has_index()[0]
assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name,
default_index_params)
# load
collection_w.load()
# scale queryNode to 5
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE)
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE)
time.sleep(60)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
# continuously search
def do_search():
while True:
search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim),
ct.default_float_vec_field_name,
ct.default_search_params, ct.default_limit)
log.debug(search_res[0].ids)
assert len(search_res[0].ids) == ct.default_limit
t_search = threading.Thread(target=do_search, args=(), daemon=True)
t_search.start()
# wait new QN running, continuously insert
# time.sleep(10)
healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200)
log.info(f"milvus healthy after scale up: {healthy}")
# wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
def do_insert():
while True:
tmp_df = cf.gen_default_dataframe_data(1000)
collection_w.insert(tmp_df)
t_insert = threading.Thread(target=do_insert, args=(), daemon=True)
t_insert.start()
log.debug(collection_w.num_entities)
time.sleep(20)
log.debug("Expand querynode test finished")
mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE)
time.sleep(60)
wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}")
log.debug(collection_w.num_entities)
time.sleep(60)
log.debug("Shrink querynode test finished")
log.debug(collection_w.num_entities)
time.sleep(60)
log.debug("Shrink querynode test finished")
except Exception as e:
raise Exception(str(e))
mic.uninstall(release_name, namespace=constants.NAMESPACE)
\ No newline at end of file
finally:
label = f"app.kubernetes.io/instance={release_name}"
log.info('Start to export milvus pod logs')
export_pod_logs(namespace=constants.NAMESPACE, label_selector=label, release_name=release_name)
mic.uninstall(release_name, namespace=constants.NAMESPACE)
\ No newline at end of file
......@@ -82,7 +82,7 @@ def get_pod_list(namespace, label_selector):
raise Exception(str(e))
def export_pod_logs(namespace, label_selector):
def export_pod_logs(namespace, label_selector, release_name=None):
"""
export pod logs with label selector to '/tmp/milvus'
......@@ -92,10 +92,19 @@ def export_pod_logs(namespace, label_selector):
:param label_selector: labels to restrict which pods logs to export
:type label_selector: str
:param release_name: use the release name as server logs director name
:type label_selector: str
:example:
>>> export_pod_logs("chaos-testing", "app.kubernetes.io/instance=mic-milvus")
"""
pod_log_path = '/tmp/milvus_logs'
if isinstance(release_name, str):
if len(release_name.strip()) == 0:
raise ValueError("Got an unexpected space release_name")
else:
raise TypeError("Got an unexpected non-string release_name")
pod_log_path = '/tmp/milvus_logs' if release_name is None else f'/tmp/milvus_logs/{release_name}'
if not os.path.isdir(pod_log_path):
os.makedirs(pod_log_path)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册