test_action_second_deployment.py 12.1 KB
Newer Older
Z
zhuwenxing 已提交
1
import pytest
2 3
import re
import time
4
import pymilvus
Z
zhuwenxing 已提交
5 6 7 8 9 10
from common import common_func as cf
from common import common_type as ct
from common.common_type import CaseLabel, CheckTasks
from common.milvus_sys import MilvusSys
from utils.util_pymilvus import *
from deploy.base import TestDeployBase
11
from deploy.common import gen_index_param, gen_search_param, get_collections
Z
zhuwenxing 已提交
12 13 14 15 16 17 18 19 20 21 22 23
from utils.util_log import test_log as log

default_nb = ct.default_nb
default_nq = ct.default_nq
default_dim = ct.default_dim
default_limit = ct.default_limit
default_search_field = ct.default_float_vec_field_name
default_search_params = ct.default_search_params
default_int64_field_name = ct.default_int64_field_name
default_float_field_name = ct.default_float_field_name
default_bool_field_name = ct.default_bool_field_name
default_string_field_name = ct.default_string_field_name
24
binary_field_name = ct.default_binary_vec_field_name
Z
zhuwenxing 已提交
25 26 27
default_search_exp = "int64 >= 0"
default_term_expr = f'{ct.default_int64_field_name} in [0, 1]'

28
pymilvus_version = pymilvus.__version__
Z
zhuwenxing 已提交
29

30

31
class TestActionSecondDeployment(TestDeployBase):
Z
zhuwenxing 已提交
32 33
    """ Test case of action before reinstall """

34 35
    @pytest.fixture(scope="function", params=get_collections())
    def all_collection_name(self, request):
Z
zhuwenxing 已提交
36 37 38 39 40 41 42 43
        if request.param == [] or request.param == "":
            pytest.skip("The collection name is invalid")
        yield request.param

    def teardown_method(self, method):
        log.info(("*" * 35) + " teardown " + ("*" * 35))
        log.info("[teardown_method] Start teardown test case %s..." %
                 method.__name__)
44 45
        log.info("show collection info")
        log.info(f"collection {self.collection_w.name} has entities: {self.collection_w.num_entities}")
46 47 48 49

        res, _ = self.utility_wrap.get_query_segment_info(self.collection_w.name)
        log.info(f"The segment info of collection {self.collection_w.name} is {res}")

50 51
        index_infos = [index.to_dict() for index in self.collection_w.indexes]
        log.info(f"collection {self.collection_w.name} index infos {index_infos}")
Z
zhuwenxing 已提交
52 53
        log.info("skip drop collection")

54
    def create_index(self, collection_w, default_index_field, default_index_param):
55

56 57
        index_field_map = dict([(index.field_name, index.index_name) for index in collection_w.indexes])
        index_infos = [index.to_dict() for index in collection_w.indexes]
58
        log.info(f"index info: {index_infos}")
59 60 61 62 63 64 65 66 67 68 69
        # log.info(f"{default_index_field:} {default_index_param:}")
        if len(index_infos) > 0:
            log.info(
                f"current index param is {index_infos[0]['index_param']}, passed in param is {default_index_param}")
            log.info(
                f"current index name is {index_infos[0]['index_name']}, passed in param is {index_field_map.get(default_index_field)}")
        collection_w.create_index(default_index_field, default_index_param,
                                  index_name=index_field_map.get(default_index_field, gen_unique_str("test")))
        collection_w.create_index(default_string_field_name, {},
                                  index_name=index_field_map.get(default_string_field_name, gen_unique_str("test")))

Z
zhuwenxing 已提交
70
    @pytest.mark.tags(CaseLabel.L3)
71
    def test_check(self, all_collection_name, data_size):
Z
zhuwenxing 已提交
72 73 74 75 76
        """
        before reinstall: create collection
        """
        self._connect()
        ms = MilvusSys()
77 78 79 80
        name = all_collection_name
        is_binary = False
        if "BIN" in name:
            is_binary = True
81
        collection_w, _ = self.collection_wrap.init_collection(name=name)
82
        self.collection_w = collection_w
Z
zhuwenxing 已提交
83
        schema = collection_w.schema
84
        data_type = [field.dtype for field in schema.fields]
Z
zhuwenxing 已提交
85
        field_name = [field.name for field in schema.fields]
86
        type_field_map = dict(zip(data_type, field_name))
Z
zhuwenxing 已提交
87 88
        if is_binary:
            default_index_field = ct.default_binary_vec_field_name
89
            vector_index_type = "BIN_IVF_FLAT"
Z
zhuwenxing 已提交
90 91
        else:
            default_index_field = ct.default_float_vec_field_name
92
            vector_index_type = "IVF_FLAT"
93

94 95 96 97 98 99
        binary_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if
                                     index.field_name == type_field_map.get(100, "")]
        float_vector_index_types = [index.params["index_type"] for index in collection_w.indexes if
                                    index.field_name == type_field_map.get(101, "")]
        index_field_map = dict([(index.field_name, index.index_name) for index in collection_w.indexes])
        index_names = [index.index_name for index in collection_w.indexes]  # used to drop index
Z
zhuwenxing 已提交
100 101 102
        vector_index_types = binary_vector_index_types + float_vector_index_types
        if len(vector_index_types) > 0:
            vector_index_type = vector_index_types[0]
103 104 105 106 107 108
        try:
            t0 = time.time()
            self.utility_wrap.wait_for_loading_complete(name)
            log.info(f"wait for {name} loading complete cost {time.time() - t0}")
        except Exception as e:
            log.error(e)
109
        # get replicas loaded
Z
zhuwenxing 已提交
110
        try:
111
            replicas = collection_w.get_replicas(enable_traceback=False)
Z
zhuwenxing 已提交
112 113
            replicas_loaded = len(replicas.groups)
        except Exception as e:
114
            log.error(e)
Z
zhuwenxing 已提交
115
            replicas_loaded = 0
116

117
        log.info(f"collection {name} has {replicas_loaded} replicas")
118 119
        actual_replicas = re.search(r'replica_number_(.*?)_', name).group(1)
        assert replicas_loaded == int(actual_replicas)
Z
zhuwenxing 已提交
120 121 122 123 124 125 126 127
        # params for search and query
        if is_binary:
            _, vectors_to_search = cf.gen_binary_vectors(
                default_nb, default_dim)
            default_search_field = ct.default_binary_vec_field_name
        else:
            vectors_to_search = cf.gen_vectors(default_nb, default_dim)
            default_search_field = ct.default_float_vec_field_name
128 129
        search_params = gen_search_param(vector_index_type)[0]

Z
zhuwenxing 已提交
130 131
        # load if not loaded
        if replicas_loaded == 0:
132 133 134 135 136 137 138 139 140 141
            # create index for vector if not exist before load
            is_vector_indexed = False
            index_infos = [index.to_dict() for index in collection_w.indexes]
            for index_info in index_infos:
                if "metric_type" in index_info.keys():
                    is_vector_indexed = True
                    break
            if is_vector_indexed is False:
                default_index_param = gen_index_param(vector_index_type)
                self.create_index(collection_w, default_index_field, default_index_param)
Z
zhuwenxing 已提交
142
            collection_w.load()
143

144 145 146 147 148 149
        # search and query
        if "empty" in name:
            # if the collection is empty, the search result should be empty, so no need to check            
            check_task = None
        else:
            check_task = CheckTasks.check_search_results
150

Z
zhuwenxing 已提交
151 152 153 154
        collection_w.search(vectors_to_search[:default_nq], default_search_field,
                            search_params, default_limit,
                            default_search_exp,
                            output_fields=[ct.default_int64_field_name],
155
                            check_task=check_task,
Z
zhuwenxing 已提交
156
                            check_items={"nq": default_nq,
157
                                         "limit": default_limit})
158 159 160 161
        if "empty" in name:
            check_task = None
        else:
            check_task = CheckTasks.check_query_not_empty
Z
zhuwenxing 已提交
162
        collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
163
                           check_task=check_task)
Z
zhuwenxing 已提交
164 165

        # flush
166 167 168 169
        if pymilvus_version >= "2.2.0":
            collection_w.flush()
        else:
            collection_w.collection.num_entities
Z
zhuwenxing 已提交
170 171

        # search and query
172 173 174 175
        if "empty" in name:
            check_task = None
        else:
            check_task = CheckTasks.check_search_results
Z
zhuwenxing 已提交
176 177 178 179
        collection_w.search(vectors_to_search[:default_nq], default_search_field,
                            search_params, default_limit,
                            default_search_exp,
                            output_fields=[ct.default_int64_field_name],
180
                            check_task=check_task,
Z
zhuwenxing 已提交
181
                            check_items={"nq": default_nq,
182
                                         "limit": default_limit})
183 184 185 186
        if "empty" in name:
            check_task = None
        else:
            check_task = CheckTasks.check_query_not_empty
Z
zhuwenxing 已提交
187
        collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
188 189
                           check_task=check_task)

Z
zhuwenxing 已提交
190 191
        # insert data and flush
        for i in range(2):
192 193
            self.insert_data_general(insert_data=True, is_binary=is_binary, nb=data_size,
                                     is_flush=False, is_index=True, name=name)
194 195 196 197
        if pymilvus_version >= "2.2.0":
            collection_w.flush()
        else:
            collection_w.collection.num_entities
198

Z
zhuwenxing 已提交
199 200 201 202 203 204 205 206 207 208 209
        # delete data
        delete_expr = f"{ct.default_int64_field_name} in [0,1,2,3,4,5,6,7,8,9]"
        collection_w.delete(expr=delete_expr)

        # search and query
        collection_w.search(vectors_to_search[:default_nq], default_search_field,
                            search_params, default_limit,
                            default_search_exp,
                            output_fields=[ct.default_int64_field_name],
                            check_task=CheckTasks.check_search_results,
                            check_items={"nq": default_nq,
210
                                         "limit": default_limit})
Z
zhuwenxing 已提交
211
        collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
212 213
                           check_task=CheckTasks.check_query_not_empty)

Z
zhuwenxing 已提交
214 215 216
        # drop index if exist
        if len(index_names) > 0:
            for index_name in index_names:
217
                collection_w.release()
Z
zhuwenxing 已提交
218
                collection_w.drop_index(index_name=index_name)
219
            default_index_param = gen_index_param(vector_index_type)
220
            self.create_index(collection_w, default_index_field, default_index_param)
221 222

            collection_w.load()
Z
zhuwenxing 已提交
223
            collection_w.search(vectors_to_search[:default_nq], default_search_field,
224 225 226 227 228 229
                                search_params, default_limit,
                                default_search_exp,
                                output_fields=[ct.default_int64_field_name],
                                check_task=CheckTasks.check_search_results,
                                check_items={"nq": default_nq,
                                             "limit": default_limit})
Z
zhuwenxing 已提交
230
            collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
231
                               check_task=CheckTasks.check_query_not_empty)
Z
zhuwenxing 已提交
232 233 234

        # search and query
        collection_w.search(vectors_to_search[:default_nq], default_search_field,
235 236 237 238 239 240
                            search_params, default_limit,
                            default_search_exp,
                            output_fields=[ct.default_int64_field_name],
                            check_task=CheckTasks.check_search_results,
                            check_items={"nq": default_nq,
                                         "limit": default_limit})
Z
zhuwenxing 已提交
241
        collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
242
                           check_task=CheckTasks.check_query_not_empty)
Z
zhuwenxing 已提交
243 244 245 246

        # release and reload with changed replicas
        collection_w.release()
        replica_number = 1
247
        if replicas_loaded in [0, 1] and len(ms.query_nodes) >= 2:
Z
zhuwenxing 已提交
248 249 250 251 252
            replica_number = 2
        collection_w.load(replica_number=replica_number)

        # search and query
        collection_w.search(vectors_to_search[:default_nq], default_search_field,
253 254 255 256 257 258
                            search_params, default_limit,
                            default_search_exp,
                            output_fields=[ct.default_int64_field_name],
                            check_task=CheckTasks.check_search_results,
                            check_items={"nq": default_nq,
                                         "limit": default_limit})
Z
zhuwenxing 已提交
259
        collection_w.query(default_term_expr, output_fields=[ct.default_int64_field_name],
260
                           check_task=CheckTasks.check_query_not_empty)