diff --git a/tests/python_client/scale/test_query_node_scale.py b/tests/python_client/scale/test_query_node_scale.py index 8ca7720b7240159f893a893d8f03b45ecae5e73e..5db550768df9c7f2df6c3c60761b8eadb8168ebb 100644 --- a/tests/python_client/scale/test_query_node_scale.py +++ b/tests/python_client/scale/test_query_node_scale.py @@ -1,15 +1,17 @@ -import pdb -import random +import threading +import time import pytest from base.collection_wrapper import ApiCollectionWrapper from common.common_type import CaseLabel -from scale.helm_env import HelmEnv +from customize.milvus_operator import MilvusOperator from common import common_func as cf from common import common_type as ct from scale import constants from pymilvus import Index, connections +from utils.util_log import test_log as log +from utils.util_k8s import wait_pods_ready prefix = "search_scale" nb = 5000 @@ -22,98 +24,86 @@ default_index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": class TestQueryNodeScale: @pytest.mark.tags(CaseLabel.L3) - def test_expand_query_node(self): + def test_scale_query_node(self): release_name = "scale-query" - env = HelmEnv(release_name=release_name) - host = env.helm_install_cluster_milvus() + query_config = { + 'metadata.namespace': constants.NAMESPACE, + 'metadata.name': release_name, + 'spec.components.image': 'harbor.zilliz.cc/milvus/milvus:master-20211202-ed546d0', + 'spec.components.proxy.serviceType': 'LoadBalancer', + 'spec.components.queryNode.replicas': 1, + 'spec.config.dataCoord.enableCompaction': True, + 'spec.config.dataCoord.enableGarbageCollection': True + } + mic = MilvusOperator() + mic.install(query_config) + healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) + log.info(f"milvus healthy: {healthy}") + host = mic.endpoint(release_name, constants.NAMESPACE).split(':')[0] + # host = "10.98.0.8" # connect connections.add_connection(default={"host": host, "port": 19530}) connections.connect(alias='default') # create - c_name = "query_scale_one" + c_name = cf.gen_unique_str("scale_query") + # c_name = 'scale_query_DymS7kI4' collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) - # insert - data = cf.gen_default_list_data(ct.default_nb) - mutation_res, _ = collection_w.insert(data) - assert mutation_res.insert_count == ct.default_nb - # # create index - # collection_w.create_index(ct.default_float_vec_field_name, default_index_params) - # assert collection_w.has_index()[0] - # assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, - # default_index_params) - collection_w.load() - # vectors = [[random.random() for _ in range(ct.default_dim)] for _ in range(5)] - res1, _ = collection_w.search(data[-1][:5], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - - # scale queryNode pod - env.helm_upgrade_cluster_milvus(queryNode=2) - - c_name_2 = "query_scale_two" - collection_w2 = ApiCollectionWrapper() - collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema()) - collection_w2.insert(data) - assert collection_w2.num_entities == ct.default_nb - collection_w2.load() - res2, _ = collection_w2.search(data[-1][:5], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) + collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema(), shards_num=2) - assert res1[0].ids == res2[0].ids + # insert two segments + for i in range(3): + df = cf.gen_default_dataframe_data(nb) + collection_w.insert(df) + log.debug(collection_w.num_entities) - @pytest.mark.tags(CaseLabel.L3) - def test_shrink_query_node(self): - """ - target: test shrink queryNode from 2 to 1 - method: 1.deploy two queryNode - 2.search two collections in two queryNode - 3.upgrade queryNode from 2 to 1 - 4.search second collection - expected: search result is correct - """ - # deploy - release_name = "scale-query" - env = HelmEnv(release_name=release_name, queryNode=2) - host = env.helm_install_cluster_milvus(image_pull_policy=constants.IF_NOT_PRESENT) + # create index + collection_w.create_index(ct.default_float_vec_field_name, default_index_params) + assert collection_w.has_index()[0] + assert collection_w.index()[0] == Index(collection_w.collection, ct.default_float_vec_field_name, + default_index_params) - # connect - connections.add_connection(default={"host": host, "port": 19530}) - connections.connect(alias='default') - - # collection one - data = cf.gen_default_list_data(nb) - c_name = "query_scale_one" - collection_w = ApiCollectionWrapper() - collection_w.init_collection(name=c_name, schema=cf.gen_default_collection_schema()) - collection_w.insert(data) - assert collection_w.num_entities == nb + # load collection_w.load() - res1, _ = collection_w.search(data[-1][:nq], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - assert res1[0].ids[0] == data[0][0] - - # collection two - c_name_2 = "query_scale_two" - collection_w2 = ApiCollectionWrapper() - collection_w2.init_collection(name=c_name_2, schema=cf.gen_default_collection_schema()) - collection_w2.insert(data) - assert collection_w2.num_entities == nb - collection_w2.load() - res2, _ = collection_w2.search(data[-1][:nq], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - assert res2[0].ids[0] == data[0][0] - - # scale queryNode pod - env.helm_upgrade_cluster_milvus(queryNode=1) - - # search - res1, _ = collection_w.search(data[-1][:nq], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - assert res1[0].ids[0] == data[0][0] - res2, _ = collection_w2.search(data[-1][:nq], ct.default_float_vec_field_name, - ct.default_search_params, ct.default_limit) - assert res2[0].ids[0] == data[0][0] - - # env.helm_uninstall_cluster_milvus() \ No newline at end of file + + # scale queryNode to 5 + mic.upgrade(release_name, {'spec.components.queryNode.replicas': 5}, constants.NAMESPACE) + + # continuously search + def do_search(): + while True: + search_res, _ = collection_w.search(cf.gen_vectors(1, ct.default_dim), + ct.default_float_vec_field_name, + ct.default_search_params, ct.default_limit) + log.debug(search_res[0].ids) + assert len(search_res[0].ids) == ct.default_limit + + t_search = threading.Thread(target=do_search, args=(), daemon=True) + t_search.start() + + # wait new QN running, continuously insert + # time.sleep(10) + healthy = mic.wait_for_healthy(release_name, constants.NAMESPACE, timeout=1200) + log.info(f"milvus healthy after scale up: {healthy}") + # wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + + def do_insert(): + while True: + tmp_df = cf.gen_default_dataframe_data(1000) + collection_w.insert(tmp_df) + + t_insert = threading.Thread(target=do_insert, args=(), daemon=True) + t_insert.start() + + log.debug(collection_w.num_entities) + time.sleep(20) + log.debug("Expand querynode test finished") + + mic.upgrade(release_name, {'spec.components.queryNode.replicas': 3}, constants.NAMESPACE) + time.sleep(60) + wait_pods_ready(constants.NAMESPACE, f"app.kubernetes.io/instance={release_name}") + + log.debug(collection_w.num_entities) + time.sleep(60) + log.debug("Shrink querynode test finished") \ No newline at end of file