test_chaos_resource_group.py 8.9 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
import time
import pytest

from base.client_base import TestcaseBase
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel
from utils.util_log import test_log as log

# customer rg
rg_name_0 = "RG_0"
rg_name_1 = "RG_1"

# coll name
coll_name_1 = "ResourceGroup_111"
coll_name_2 = "ResourceGroup_222"

# resource group info of 4 qns
resource_group_info = [
    {"name": rg_name_0, "available_node": 1, "capacity": 1, "loaded_replica": {coll_name_1: 1}},
    {"name": rg_name_1, "available_node": 1, "capacity": 1, "loaded_replica": {coll_name_1: 1}},
    {"name": ct.default_resource_group_name, "available_node": 2,
     "capacity": ct.default_resource_group_capacity, "loaded_replica": {coll_name_2: 2}}

class TestChaosRG(TestcaseBase):
    """ Test case of end to end"""

    def teardown_method(self, method):
        log.info(("*" * 35) + " teardown " + ("*" * 35))
        log.info("[teardown_method] Start teardown test case %s..." %
        log.info("skip drop collection")

    def test_milvus_resource_group(self):
        nb = 10000
        # collection rg map
        collection_rg_map = {
            coll_name_1: {"resource_groups": [rg_name_0, rg_name_1], "replica_number": 2},
            coll_name_2: {"resource_groups": [ct.default_resource_group_name], "replica_number": 2}

        # create RG_0, RG_1, transfer 1 node to RG_0, 1 node to RG_1

        for rg_info in resource_group_info:
            rg_name = rg_info["name"]
            if rg_name != ct.default_resource_group_name:
                _, create_rg_res = self.utility_wrap.create_resource_group(rg_name)
                assert create_rg_res
                log.info(f"[ResourceGroup] Create rg {rg_name} done")
                self.utility_wrap.transfer_node(source=ct.default_resource_group_name, target=rg_name,
                log.info(f'[ResourceGroup] Transfer {rg_info["available_node"]} nodes from {ct.default_resource_group_name} to {rg_name} done')

        # verify RGs
        resource_groups, _ = self.utility_wrap.list_resource_groups()
        assert len(resource_groups) == len(resource_group_info)
        assert all([rg_info["name"] in resource_groups for rg_info in resource_group_info])
        for rg_info in resource_group_info:
            rg_info = {"name": rg_info["name"],
                       "capacity": rg_info["capacity"],
                       "num_available_node": rg_info["available_node"],
                       "num_loaded_replica": {},
                       "num_outgoing_node": {},
                       "num_incoming_node": {}
            desc_rg_info, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"],
            log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info}')

        # prepare collection C1, C2
        # create
        data = cf.gen_default_dataframe_data(nb=nb)
        index_params = {"index_type": "HNSW", "metric_type": "L2", "params": {"M": 48, "efConstruction": 500}}

        for coll_name in coll_name_1, coll_name_2:
            # create
            collection_w = self.init_collection_wrap(name=coll_name, active_trace=True)
            log.info(f"create collection {collection_w.name} done")
            entities = collection_w.num_entities

            # insert
            _, res = collection_w.insert(data)
            assert res
            log.info(f"insert {nb} entities done")

            # flush
            _, check_result = collection_w.flush(timeout=180)
            assert check_result
            assert collection_w.num_entities == nb + entities
            entities = collection_w.num_entities
            log.info(f"flush done with entities: {entities}")

            # index
            index, _ = collection_w.create_index(field_name=ct.default_float_vec_field_name,
            index, _ = collection_w.create_index(field_name=ct.default_string_field_name,
            index_infos = [index.to_dict() for index in collection_w.indexes]
            log.info(f"index info: {index_infos}")

            # load coll_rg_a, 2 replicas -> RG_0, RG_1
            # load coll_rg_b, 2 replicas -> default_RG

            # show query segment info
            segment_info, _ = self.utility_wrap.get_query_segment_info(collection_w.name)
            log.info(f"{collection_w.name} segment info: {segment_info}")

            # show replicas info
            replicas, _ = collection_w.get_replicas()
            log.info(f"{collection_w.name} replica info: {replicas}")

            # search
            search_vectors = cf.gen_vectors(ct.default_nq, ct.default_dim)
            search_params = {"metric_type": "L2", "params": {"ef": 64}}
            search_res, _ = collection_w.search(data=search_vectors,
                                                param=search_params, limit=ct.default_limit, expr="int64 >= 0")
            assert len(search_res) == ct.default_nq
            assert len(search_res[0]) == ct.default_limit

            # query and delete
            term_expr = f'{ct.default_int64_field_name} < 100'
            query_res, _ = collection_w.query(term_expr)
            assert len(query_res) == 100

            delete_expr = f'{ct.default_int64_field_name} in {[i for i in range(100)]}'
            collection_w.query(term_expr, check_task=ct.CheckTasks.check_query_empty)

        # verify rg replica info
        for rg_info in resource_group_info:
            rg_info = {"name": rg_info["name"],
                       "capacity": rg_info["capacity"],
                       "num_available_node": rg_info["available_node"],
                       "num_loaded_replica": rg_info["loaded_replica"],
                       "num_outgoing_node": {},
                       "num_incoming_node": {}
            desc_rg_info_2, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"],
            log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info_2}')

    def test_verify_milvus_resource_group(self):

        # verify collection exist
        all_collections, _ = self.utility_wrap.list_collections()
        assert all(coll_name in all_collections for coll_name in [coll_name_1, coll_name_2])

        # verify resource groups
        for rg_info in resource_group_info:
            rg_info = {"name": rg_info["name"],
                       "capacity": rg_info["capacity"],
                       "num_available_node": rg_info["available_node"],
                       "num_loaded_replica": rg_info["loaded_replica"],
                       "num_outgoing_node": {},
                       "num_incoming_node": {}
            desc_rg_info, _ = self.utility_wrap.describe_resource_group(name=rg_info["name"],
            log.info(f'[ResourceGroup] Rg of {rg_info["name"]} info is: {desc_rg_info}')

        # search
        for coll_name in coll_name_1, coll_name_2:
            collection_w = self.init_collection_wrap(name=coll_name, active_trace=True)
            for i in range(10):
                search_vectors = cf.gen_vectors(ct.default_nq, ct.default_dim)
                search_params = {"metric_type": "L2", "params": {"ef": 64}}
                search_res, _ = collection_w.search(data=search_vectors,
                                                    param=search_params, limit=ct.default_limit, expr="int64 >= 0")
                assert len(search_res) == ct.default_nq
                assert len(search_res[0]) == ct.default_limit