test_delete.py 37.3 KB
Newer Older
T
ThreadDao 已提交
1 2 3
import pytest

from base.client_base import TestcaseBase
T
ThreadDao 已提交
4
from common import common_func as cf
T
ThreadDao 已提交
5
from common import common_type as ct
6
from utils.util_log import test_log as log
T
ThreadDao 已提交
7
from common.common_type import CaseLabel, CheckTasks
T
ThreadDao 已提交
8 9

prefix = "delete"
10 11
half_nb = ct.default_nb // 2
tmp_nb = 100
T
ThreadDao 已提交
12
tmp_expr = f'{ct.default_int64_field_name} in {[0]}'
T
ThreadDao 已提交
13
query_res_tmp_expr = [{f'{ct.default_int64_field_name}': 0}]
14
exp_res = "exp_res"
T
ThreadDao 已提交
15 16 17


class TestDeleteParams(TestcaseBase):
T
ThreadDao 已提交
18 19 20 21 22 23 24 25
    """
    Test case of delete interface
    def delete(expr, partition_name=None, timeout=None, **kwargs)
    return MutationResult
    Only the `in` operator is supported in the expr
    """

    @pytest.mark.tags(CaseLabel.L0)
26 27
    @pytest.mark.parametrize('is_binary', [False, True])
    def test_delete_entities(self, is_binary):
T
ThreadDao 已提交
28 29
        """
        target: test delete data from collection
T
ThreadDao 已提交
30 31
        method: 1.create and insert nb with flush
                2.load collection
B
binbin 已提交
32
                3.delete half of nb
T
ThreadDao 已提交
33 34
                4.query with deleted ids
        expected: Query result is empty
T
ThreadDao 已提交
35
        """
36
        # init collection with default_nb default data
37
        collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True, is_binary=is_binary)[0:4]
38
        expr = f'{ct.default_int64_field_name} in {ids[:half_nb]}'
T
ThreadDao 已提交
39 40

        # delete half of data
41 42
        del_res = collection_w.delete(expr)[0]
        assert del_res.delete_count == half_nb
43
        # This flush will not persist the deleted ids, just delay the time to ensure that queryNode consumes deleteMsg
44 45 46
        collection_w.num_entities

        # query with deleted ids
47
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
T
ThreadDao 已提交
48

T
ThreadDao 已提交
49 50 51 52 53 54 55
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_without_connection(self):
        """
        target: test delete without connect
        method: delete after remove connection
        expected: raise exception
        """
56
        # init collection with tmp_nb default data
T
ThreadDao 已提交
57
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
58 59

        # remove connection and delete
T
ThreadDao 已提交
60 61 62 63 64 65
        self.connection_wrap.remove_connection(ct.default_alias)
        res_list, _ = self.connection_wrap.list_connections()
        assert ct.default_alias not in res_list
        error = {ct.err_code: 0, ct.err_msg: "should create connect first"}
        collection_w.delete(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)

66
    # Not Milvus Exception
67 68 69 70 71
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_none(self):
        """
        target: test delete with None expr
        method: delete with None expr
72
        expected: raise exception
73 74 75
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
T
ThreadDao 已提交
76
        error = {ct.err_code: 0, ct.err_msg: "expr cannot be None"}
77
        collection_w.delete(expr=None, check_task=CheckTasks.err_res, check_items=error)
78

79
    @pytest.mark.tags(CaseLabel.L2)
T
ThreadDao 已提交
80
    @pytest.mark.parametrize("expr", [1, [], ()])
81 82 83 84 85 86 87 88
    def test_delete_expr_non_string(self, expr):
        """
        target: test delete with non-string expression
        method: delete with non-string expr
        expected: raise exception
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
T
ThreadDao 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102
        error = {ct.err_code: 0, ct.err_msg: f"expr value {expr} is illegal"}
        collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error)

    @pytest.mark.tags(CaseLabel.L2)
    @pytest.mark.parametrize("expr", ["12-s", "中文"])
    def test_delete_invalid_expr_string(self, expr):
        """
        target: test delete with invalid string expr
        method: delete with invalid string
        expected: Raise exception
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        error = {ct.err_code: 1, ct.err_msg: f"failed to create expr plan, expr = {expr}"}
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        collection_w.delete(expr, check_task=CheckTasks.err_res, check_items=error)

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_empty_value(self):
        """
        target: test delete with empty array expr
        method: delete with expr: "id in []"
        expected: assert num entities
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[]}'

        # delete empty entities
        collection_w.delete(expr)

119 120 121 122 123 124 125 126 127 128
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_single(self):
        """
        target: test delete with one value
        method: delete with expr: "id in [0]"
        expected: Describe num entities by one
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[0]}'
129 130
        del_res, _ = collection_w.delete(expr)
        assert del_res.delete_count == 1
T
ThreadDao 已提交
131
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
132 133 134 135 136 137

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_all_values(self):
        """
        target: test delete with all values
        method: delete with expr: "id in [all]"
138
        expected: num entities unchanged and deleted data will be not be queried
139 140
        """
        # init collection with default_nb default data
141
        collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4]
142 143
        expr = f'{ct.default_int64_field_name} in {ids}'
        del_res, _ = collection_w.delete(expr)
144

145 146 147
        # assert results
        assert del_res.delete_count == ct.default_nb
        assert collection_w.num_entities == ct.default_nb
148

149
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
150

151
    @pytest.mark.tags(CaseLabel.L1)
152
    def test_delete_not_existed_values(self):
153
        """
154
        target: test delete not existed values
155
        method: delete data not in the collection
156
        expected: No exception
157 158 159 160
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]

161
        # No exception
162 163
        expr = f'{ct.default_int64_field_name} in {[tmp_nb]}'
        collection_w.delete(expr=expr)[0]
T
ThreadDao 已提交
164 165
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_results,
                           check_items={exp_res: query_res_tmp_expr})
166

167 168 169 170 171 172 173 174 175 176 177 178 179
    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_part_not_existed_values(self):
        """
        target: test delete part non-existed values
        method: delete ids which part not existed
        expected: delete existed id, ignore non-existed id
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[0, tmp_nb]}'
        collection_w.delete(expr=expr)[0]
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)

180 181 182 183 184 185 186 187 188 189 190
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_inconsistent_values(self):
        """
        target: test delete with inconsistent type values
        method: delete with non-int64 type values
        expected: raise exception
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[0.0, 1.0]}'

191 192
        # Bad exception message
        error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan,"}
193 194 195 196 197 198 199 200 201 202 203 204 205
        collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error)

    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_expr_mix_values(self):
        """
        target: test delete with mix type values
        method: delete with int64 and float values
        expected: raise exception
        """
        # init collection with tmp_nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[0, 1.0]}'

206 207
        # Bad exception message
        error = {ct.err_code: 1, ct.err_msg: "failed to create expr plan"}
208 209
        collection_w.delete(expr=expr, check_task=CheckTasks.err_res, check_items=error)

210 211 212 213 214
    @pytest.mark.tags(CaseLabel.L0)
    def test_delete_partition(self):
        """
        target: test delete from partition
        method: delete with partition names
B
binbin 已提交
215
        expected: verify partition entities are deleted
216 217 218 219 220
        """
        # init collection and partition
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        partition_w = self.init_partition_wrap(collection_wrap=collection_w)

T
ThreadDao 已提交
221 222
        # load collection and insert data to partition
        collection_w.load()
223 224 225
        df = cf.gen_default_dataframe_data(tmp_nb)
        partition_w.insert(df)

T
ThreadDao 已提交
226 227 228 229 230
        # delete ids from partition
        del_res, _ = collection_w.delete(tmp_expr, partition_name=partition_w.name)
        assert del_res.delete_count == 1

        # query with deleted id and query with existed id
T
ThreadDao 已提交
231
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty, partition_names=[partition_w.name])
T
ThreadDao 已提交
232 233 234
        res = df.iloc[1:2, :1].to_dict('records')
        collection_w.query(f'{ct.default_int64_field_name} in [1]',
                           check_task=CheckTasks.check_query_results, check_items={exp_res: res})
235 236 237 238 239

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_default_partition(self):
        """
        target: test delete from default partition
B
binbin 已提交
240
        method: delete with partition name "_default"
241 242
        expected: assert delete successfully
        """
T
ThreadDao 已提交
243
        # create, insert with flush, load collection
244
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
T
ThreadDao 已提交
245 246
        del_res, _ = collection_w.delete(tmp_expr, partition_name=ct.default_partition_name)
        assert del_res.delete_count == 1
T
ThreadDao 已提交
247
        collection_w.num_entities
T
ThreadDao 已提交
248
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
249

T
ThreadDao 已提交
250
    @pytest.mark.parametrize("partition_name", [1, [], {}, ()])
251
    @pytest.mark.tags(CaseLabel.L2)
T
ThreadDao 已提交
252
    def test_delete_non_string_partition_name(self, partition_name):
253
        """
T
ThreadDao 已提交
254 255 256
        target: test delete with non-string partition name
        method: delete with non-string partition name
        expected: Raise exception
257
        """
T
ThreadDao 已提交
258 259
        # create, insert with flush, load collection
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
260

T
ThreadDao 已提交
261 262
        error = {ct.err_code: 0, ct.err_msg: f"partition_name value {partition_name} is illegal"}
        collection_w.delete(tmp_expr, partition_name=partition_name, check_task=CheckTasks.err_res, check_items=error)
263

T
ThreadDao 已提交
264 265

class TestDeleteOperation(TestcaseBase):
T
ThreadDao 已提交
266 267 268 269 270 271 272
    """
    ******************************************************************
      The following cases are used to test delete interface operations
    ******************************************************************
    """

    @pytest.mark.tags(CaseLabel.L1)
273
    def test_delete_from_empty_collection(self):
T
ThreadDao 已提交
274 275
        """
        target: test delete entities from an empty collection
B
binbin 已提交
276
        method: create a collection and delete entities
277
        expected: No exception
T
ThreadDao 已提交
278 279 280
        """
        c_name = cf.gen_unique_str(prefix)
        collection_w = self.init_collection_wrap(name=c_name)
T
ThreadDao 已提交
281
        collection_w.delete(tmp_expr)[0]
282
        # todo assert del_res.delete_count == 0
T
ThreadDao 已提交
283 284 285 286 287 288

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_entities_repeatedly(self):
        """
        target: test delete entities twice
        method: delete with same expr twice
B
binbin 已提交
289
        expected: No exception for second deletion
T
ThreadDao 已提交
290 291 292
        """
        # init collection with nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
T
ThreadDao 已提交
293 294 295

        # assert delete successfully and no exception
        collection_w.delete(expr=tmp_expr)
296
        collection_w.num_entities
T
ThreadDao 已提交
297
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
T
ThreadDao 已提交
298
        collection_w.delete(expr=tmp_expr)
299

T
ThreadDao 已提交
300
    @pytest.mark.xfail(reason="Issue #11641")
301 302 303 304
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_after_index(self):
        """
        target: test delete after creating index
T
ThreadDao 已提交
305 306 307 308 309
        method: 1.insert, flush, load
                2.create index and re-load
                3.delete entities
                4.search
        expected: assert index and deleted id not in search result
310
        """
T
ThreadDao 已提交
311 312
        # create collection, insert tmp_nb, flush and load
        collection_w, vectors = self.init_collection_general(prefix, insert_data=True)[0:2]
313 314 315 316 317

        # create index
        index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
        collection_w.create_index(ct.default_float_vec_field_name, index_params)
        assert collection_w.has_index()[0]
T
ThreadDao 已提交
318 319
        collection_w.release()
        collection_w.load()
320
        # delete entity
T
ThreadDao 已提交
321
        collection_w.delete(tmp_expr)
322 323 324
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
        assert collection_w.has_index()[0]

T
ThreadDao 已提交
325 326 327 328 329 330 331
        # search with id 0 vectors
        search_res, _ = collection_w.search([vectors[0][ct.default_float_vec_field_name][0]],
                                            ct.default_float_vec_field_name,
                                            ct.default_search_params, ct.default_limit)
        assert 0 not in search_res[0].ids

    @pytest.mark.xfail(reason="Issue #11641")
332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_and_index(self):
        """
        target: test delete and create index
        method: 1.insert
                2.delete half
                3.flush and create index
                4.search
        expected: Empty search result
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data()
        insert_res, _ = collection_w.insert(df)

        # delete half and flush
        expr = f'{ct.default_int64_field_name} in {insert_res.primary_keys[:ct.default_nb // 2]}'
        del_res, _ = collection_w.delete(expr)
        assert collection_w.num_entities == ct.default_nb

        # create index
        index_params = {"index_type": "IVF_SQ8", "metric_type": "L2", "params": {"nlist": 64}}
        collection_w.create_index(ct.default_float_vec_field_name, index_params)
        assert collection_w.has_index()[0]

        collection_w.load()
        search_res, _ = collection_w.search([df[ct.default_float_vec_field_name][0]],
                                            ct.default_float_vec_field_name,
                                            ct.default_search_params, ct.default_limit)
T
ThreadDao 已提交
361
        log.debug(search_res[0].ids)
362 363 364 365
        # assert search results not contains deleted ids
        inter = set(insert_res.primary_keys[:ct.default_nb // 2]).intersection(set(search_res[0].ids))
        log.debug(inter)
        assert len(inter) == 0
366

367 368 369
    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_query_ids_both_sealed_and_channel(self):
        """
370
        target: test query that delete ids from both channel and sealed
371 372 373 374 375
        method: 1.create and insert
                2.delete id 0 and flush
                3.load and query id 0
                4.insert new id and delete the id
                5.query id 0 and new id
T
ThreadDao 已提交
376
        expected: Empty query result
377 378 379 380 381 382 383 384 385 386 387 388
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)

        # delete id 0 and flush
        del_res, _ = collection_w.delete(tmp_expr)
        assert del_res.delete_count == 1
        assert collection_w.num_entities == tmp_nb

        # load and query id 0
389
        collection_w.load()
390 391 392 393 394 395 396 397 398 399 400
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

        # insert id tmp_nb and delete id 0 and tmp_nb
        df_new = cf.gen_default_dataframe_data(nb=1, start=tmp_nb)
        collection_w.insert(df_new)
        collection_w.delete(expr=f'{ct.default_int64_field_name} in {[tmp_nb]}')

        # query with id 0 and tmp_nb
        collection_w.query(expr=f'{ct.default_int64_field_name} in {[0, tmp_nb]}',
                           check_task=CheckTasks.check_query_empty)

401 402 403 404 405 406 407 408
    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_search(self):
        """
        target: test delete and search
        method: search entities after it was deleted
        expected: deleted entity is not in the search result
        """
        # init collection with nb default data
T
ThreadDao 已提交
409
        collection_w, _, _, ids = self.init_collection_general(prefix, insert_data=True)[0:4]
410
        entity, _ = collection_w.query(tmp_expr, output_fields=["%"])
411 412
        search_res, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]],
                                            ct.default_float_vec_field_name,
T
ThreadDao 已提交
413
                                            ct.default_search_params, ct.default_limit)
414
        # assert search results contains entity
415 416
        assert 0 in search_res[0].ids

T
ThreadDao 已提交
417 418
        expr = f'{ct.default_int64_field_name} in {ids[:ct.default_nb // 2]}'
        collection_w.delete(expr)
419 420
        search_res_2, _ = collection_w.search([entity[0][ct.default_float_vec_field_name]],
                                              ct.default_float_vec_field_name,
T
ThreadDao 已提交
421
                                              ct.default_search_params, ct.default_limit)
422
        # assert search result is not equal to entity
T
ThreadDao 已提交
423 424 425
        log.debug(f"Second search result ids: {search_res_2[0].ids}")
        inter = set(ids[:ct.default_nb // 2]).intersection(set(search_res_2[0].ids))
        assert len(inter) == 0
426 427 428 429 430 431 432 433 434 435 436 437 438

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_expr_repeated_values(self):
        """
        target: test delete with repeated values
        method: 1.insert data with unique primary keys
                2.delete with repeated values: 'id in [0, 0]'
        expected: delete one entity
        """
        # init collection with nb default data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]
        expr = f'{ct.default_int64_field_name} in {[0, 0, 0]}'
        del_res, _ = collection_w.delete(expr)
T
ThreadDao 已提交
439
        assert del_res.delete_count == 3
440
        collection_w.num_entities
T
ThreadDao 已提交
441
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
442

443
    @pytest.mark.tags(CaseLabel.L1)
444 445 446 447 448
    def test_delete_duplicate_primary_keys(self):
        """
        target: test delete from duplicate primary keys
        method: 1.insert data with dup ids
                2.delete with repeated or not values
T
ThreadDao 已提交
449 450
        expected: currently only delete one entity, query get one entity
        todo delete all entities
451 452 453 454 455 456 457
        """
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data(nb=tmp_nb)
        df[ct.default_int64_field_name] = 0
        collection_w.insert(df)
        assert collection_w.num_entities == tmp_nb
        del_res, _ = collection_w.delete(tmp_expr)
458
        collection_w.load()
T
ThreadDao 已提交
459 460 461 462 463 464 465 466 467

        # Just one query res and search res, because de-dup
        res, _ = collection_w.query(tmp_expr, output_fields=["*"])
        assert res[0][ct.default_float_field_name] != 0.0

        search_res, _ = collection_w.search([df[ct.default_float_vec_field_name][0]],
                                            ct.default_float_vec_field_name,
                                            ct.default_search_params, ct.default_limit, output_fields=[ct.default_int64_field_name, ct.default_float_field_name])
        assert search_res[0][0].entity._row_data[ct.default_float_field_name] != 0.0
468 469 470 471 472 473

    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_empty_partition(self):
        """
        target: test delete empty partition
        method: delete from an empty partition
T
ThreadDao 已提交
474
        expected: No exception
475 476 477 478 479
        """
        # init collection and partition
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        partition_w = self.init_partition_wrap(collection_wrap=collection_w)

T
ThreadDao 已提交
480
        collection_w.delete(tmp_expr, partition_name=partition_w.name)
481 482 483 484 485 486 487 488 489 490 491 492

    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_not_existed_partition(self):
        """
        target: test delete from an not existed partition
        method: delete from an fake partition
        expected: raise exception
        """
        # init collection with tmp_nb data
        collection_w = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True)[0]

        # raise exception
T
ThreadDao 已提交
493
        error = {ct.err_code: 1, ct.err_msg: f"partitionID of partitionName:{ct.default_tag} can not be find"}
T
ThreadDao 已提交
494
        collection_w.delete(tmp_expr, partition_name=ct.default_tag, check_task=CheckTasks.err_res, check_items=error)
495 496 497 498 499 500 501

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_from_partition_with_another_ids(self):
        """
        target: delete another partition entities from partition
        method: 1.insert nb entities into two partitions in half
                2.delete entities from partition_1 with partition_2 values
T
ThreadDao 已提交
502 503
                3.delete entities from partition_1 with partition_1 values
        expected: Entities in partition_1 will be deleted
504
        """
T
ThreadDao 已提交
505
        half = tmp_nb // 2
506
        # create, insert, flush, load
507 508 509 510
        collection_w, partition_w, _, _ = self.insert_entities_into_two_partitions_in_half(half)

        # delete entities from another partition
        expr = f'{ct.default_int64_field_name} in {[0]}'
T
ThreadDao 已提交
511 512
        collection_w.delete(expr, partition_name=ct.default_partition_name)
        collection_w.query(expr, check_task=CheckTasks.check_query_results, check_items={exp_res: query_res_tmp_expr})
513

T
ThreadDao 已提交
514 515 516
        # delete entities from own partition
        collection_w.delete(expr, partition_name=partition_w.name)
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
517 518 519 520 521 522

    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_from_partitions_with_same_ids(self):
        """
        target: test delete same ids from two partitions with same data
        method: 1.insert same nb data into two partitions
T
ThreadDao 已提交
523 524
                2.delete same ids from partition_1
        expected: The data only in partition_1 will be deleted
525 526 527 528 529 530 531 532 533 534
        """
        # init collection and partition
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        partition_w = self.init_partition_wrap(collection_wrap=collection_w)

        # insert same data into partition_w and default partition
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
        partition_w.insert(df)

T
ThreadDao 已提交
535 536
        # delete same id 0 from default_partition, and query on it get empty result
        collection_w.delete(tmp_expr, partition_name=ct.default_partition_name)
537 538
        assert collection_w.num_entities == tmp_nb * 2
        collection_w.load()
T
ThreadDao 已提交
539 540
        collection_w.query(tmp_expr, partition_names=[ct.default_partition_name],
                           check_task=CheckTasks.check_query_empty)
541

T
ThreadDao 已提交
542 543 544 545
        # query on partition_w with id 0 and get an result
        collection_w.query(tmp_expr, partition_names=[partition_w.name],
                           check_task=CheckTasks.check_query_results, check_items={exp_res: query_res_tmp_expr})

546 547 548 549 550 551 552 553
    @pytest.mark.tags(CaseLabel.L0)
    def test_delete_auto_id_collection(self):
        """
        target: test delete from auto_id collection
        method: delete entities from auto_id=true collection
        expected: versify delete successfully
        """
        # init an auto_id collection and insert tmp_nb data
554
        collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
555 556 557 558 559 560 561

        # delete with insert ids
        expr = f'{ct.default_int64_field_name} in {[ids[0]]}'
        res, _ = collection_w.delete(expr)

        # verify delete result
        assert res.delete_count == 1
562
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
563

564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_query_without_loading(self):
        """
        target: test delete and query without loading
        method: 1.insert and flush data
                2.delete ids
                3.query without loading
        expected: Raise exception
        """
        # create collection, insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
        assert collection_w.num_entities == tmp_nb

579 580 581 582
        # delete
        res = collection_w.delete(tmp_expr)[0]
        assert res.delete_count == 1

583 584
        # query without loading and raise exception
        error = {ct.err_code: 1, ct.err_msg: f"collection {collection_w.name} was not loaded into memory"}
585
        collection_w.query(expr=tmp_expr, check_task=CheckTasks.err_res, check_items=error)
586

587
    @pytest.mark.tags(CaseLabel.L1)
T
ThreadDao 已提交
588
    def test_delete_sealed_segment_without_flush(self):
589 590
        """
        target: test delete without flush
591 592
        method: 1.insert and flush data
                2.delete ids from collection and no flush
593
                3.load and query with id
594
        expected: No query result
595
        """
596
        # create collection, insert data without flush
597 598 599
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
600
        assert collection_w.num_entities == tmp_nb
601 602 603 604 605

        # delete
        del_res, _ = collection_w.delete(tmp_expr)
        assert del_res.delete_count == 1

606 607
        # load and query with id
        collection_w.load()
T
ThreadDao 已提交
608
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
609

610
    @pytest.mark.tags(CaseLabel.L1)
T
ThreadDao 已提交
611
    def test_delete_growing_data_channel_delete(self):
612
        """
T
ThreadDao 已提交
613
        target: test delete entities from growing segment, and channel deleteMsg
614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630
        method: 1.create collection
                2.load collection
                3.insert data and delete ids
                4.query deleted ids
        expected: No query result
        """
        # create collection
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        # load collection and the queryNode watch the insertChannel
        collection_w.load()
        # insert data
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
        # delete id 0
        del_res = collection_w.delete(tmp_expr)[0]
        assert del_res.delete_count == 1
        # query id 0
631
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
632

633
    @pytest.mark.tags(CaseLabel.L1)
T
ThreadDao 已提交
634
    def test_delete_sealed_data_channel_delete(self):
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653
        """
        target: test delete sealed data and get deleteMsg from insertChannel
        method: 1.create, insert and flush data
                2.load collection
                3.delete id without flush
                4.query deleted ids (queryNode get deleted ids from channel not persistence)
        expected: Delete successfully and no query result
        """
        # create collection and insert flush data
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
        assert collection_w.num_entities == tmp_nb

        # load collection and queryNode subscribe channel
        collection_w.load()

        # delete ids and query
        collection_w.delete(tmp_expr)
T
ThreadDao 已提交
654
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
655

656
    @pytest.mark.tags(CaseLabel.L1)
657 658 659 660
    def test_delete_sealed_segment_with_flush(self):
        """
        target: test delete data from sealed segment and flush delta log
        method: 1.create and insert and flush data
661
                2.delete entities and flush (insert and flush)
662 663 664 665 666 667 668 669 670
                3.load collection (load data and delta log)
                4.query deleted ids
        expected: No query result
        """
        # create collection
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        # insert and flush data
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
671 672
        # assert collection_w.num_entities == tmp_nb

T
ThreadDao 已提交
673
        # delete id 0
674 675
        del_res = collection_w.delete(tmp_expr)[0]
        assert del_res.delete_count == 1
T
ThreadDao 已提交
676 677 678

        # insert data and flush data and delete ids.
        # if no insert, datanode will not really flush delete ids
679 680 681
        collection_w.insert(cf.gen_default_dataframe_data(nb=1, start=tmp_nb))
        log.info(f'Collection num entities: {collection_w.num_entities}')

682
        # load and query id 0
683
        collection_w.load()
T
ThreadDao 已提交
684
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
685

686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
    @pytest.mark.tags(CaseLabel.L1)
    def test_delete_sealed_segment_with_twice_flush(self):
        """
        target: test delete data from sealed segment and flush delta log
        method: 1.create and insert and flush data
                2.delete entities and flush (insert and flush)
                3.load collection (load data and delta log)
                4.query deleted ids
        expected: No query result
        """
        # create collection
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        # insert and flush data
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)
        assert collection_w.num_entities == tmp_nb

        # delete id 0 and flush
        del_res = collection_w.delete(tmp_expr)[0]
        assert del_res.delete_count == 1
        collection_w.insert(cf.gen_default_dataframe_data(nb=1, start=tmp_nb))
        log.info(collection_w.num_entities)
        # load and query id 0
        collection_w.load()
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

712
    @pytest.mark.tags(CaseLabel.L1)
T
ThreadDao 已提交
713
    def test_delete_sealed_data_sealed_delete(self):
714
        """
T
ThreadDao 已提交
715
        target: test delete with sealed data and sealed delete request
716
        method: 1.create, insert
T
ThreadDao 已提交
717
                2.delete and flush (will flush data and delete)
718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735
                3.load and query
        expected: Empty query result
        """
        # create collection
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
        # insert without flush
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)

        # delete id 0 and flush
        del_res = collection_w.delete(tmp_expr)[0]
        assert del_res.delete_count == 1
        assert collection_w.num_entities == tmp_nb

        # load and query id 0
        collection_w.load()
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

736
    @pytest.mark.tags(CaseLabel.L2)
737 738 739 740 741 742 743 744 745 746
    def test_delete_insert_same_entity(self):
        """
        target: test delete and insert same entity
        method: 1.delete entity one
                2.insert entity one
                3.query  entity one
        expected: verify query result
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix))
T
ThreadDao 已提交
747
        collection_w.load()
748 749 750 751 752 753
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)

        # delete
        del_res, _ = collection_w.delete(tmp_expr)
        assert del_res.delete_count == 1
754
        # assert collection_w.num_entities == tmp_nb
T
ThreadDao 已提交
755
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)
756 757 758 759 760 761 762 763

        # insert entity with primary key 0
        collection_w.insert(df[:1])

        # query entity one
        res = df.iloc[0:1, :1].to_dict('records')
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_results, check_items={'exp_res': res})

764
    @pytest.mark.tags(CaseLabel.L2)
765 766 767 768 769 770
    def test_delete_entity_loop(self):
        """
        target: test delete all entities one by one in a loop
        method: delete data one by one for a loop
        expected: No exception
        """
T
ThreadDao 已提交
771
        # init an auto_id collection and insert tmp_nb data, flush and load
772
        collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
773 774 775

        for del_id in ids:
            expr = f'{ct.default_int64_field_name} in {[del_id]}'
T
ThreadDao 已提交
776
            res = collection_w.delete(expr)[0]
777 778
            assert res.delete_count == 1

779 780 781
        # query with all ids
        expr = f'{ct.default_int64_field_name} in {ids}'
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
782

783
    @pytest.mark.tags(CaseLabel.L2)
784 785 786 787 788 789 790
    def test_delete_flush_loop(self):
        """
        target: test delete and flush in a loop
        method: in a loop, delete batch and flush, until delete all entities
        expected: No exception
        """
        # init an auto_id collection and insert tmp_nb data
791
        collection_w, _, _, ids = self.init_collection_general(prefix, nb=tmp_nb, insert_data=True, auto_id=True)[0:4]
792 793 794

        batch = 10
        for i in range(tmp_nb // batch):
795
            expr = f'{ct.default_int64_field_name} in {ids[i * batch: (i + 1) * batch]}'
796 797 798 799
            res, _ = collection_w.delete(expr)
            assert res.delete_count == batch
            assert collection_w.num_entities == tmp_nb

800
        # query with all ids
T
ThreadDao 已提交
801 802
        expr = f'{ct.default_int64_field_name} in {ids}'
        collection_w.query(expr, check_task=CheckTasks.check_query_empty)
803

804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826
    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_merge_same_id_channel_and_sealed(self):
        """
        target: test merge same delete ids from channel and sealed
        method: 1.create, insert
                2.delete id and flush (data and deleted become sealed)
                3.load and query (verify delete successfully)
                4.insert entity with deleted id
                5.delete id
                6.query with id
        expected: Empty query result
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)

        # delete id 0 and flush
        del_res, _ = collection_w.delete(tmp_expr)
        assert del_res.delete_count == 1
        assert collection_w.num_entities == tmp_nb

        # load and query id 0
827
        collection_w.load()
828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

        # re-insert id 0 and re-delete id 0
        collection_w.insert(df[:1])
        collection_w.delete(tmp_expr)
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

    @pytest.mark.tags(CaseLabel.L2)
    def test_delete_merge_ids_channel_and_sealed(self):
        """
        target: test merge deleted ids come from both channel and sealed
        method: 1.create, insert ids [0, tmp_nb) with shard_num=1
                2.delete id 0 and flush
                3.load and query with id 0
                4.delete id 1 (merge same segment deleted ids 0 and 1)
                5.query with id 0 and 1
        expected: Empty query result
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=1)
        df = cf.gen_default_dataframe_data(tmp_nb)
        collection_w.insert(df)

        # delete id 0 and flush
        del_res, _ = collection_w.delete(tmp_expr)
        assert del_res.delete_count == 1
        assert collection_w.num_entities == tmp_nb

        # load and query id 0
857
        collection_w.load()
858 859 860 861
        collection_w.query(tmp_expr, check_task=CheckTasks.check_query_empty)

        # delete id 1 and query id 0 and 1
        collection_w.delete(expr=f'{ct.default_int64_field_name} in {[1]}')
B
binbin 已提交
862 863
        collection_w.query(expr=f'{ct.default_int64_field_name} in {[0, 1]}',
                           check_task=CheckTasks.check_query_empty)
864

865
    @pytest.mark.tags(CaseLabel.L2)
866
    @pytest.mark.skip(reason="TODO")
867 868 869 870 871 872 873
    def test_delete_multi_threading(self):
        """
        target: test delete multi threading
        method: delete multi threading
        expected: delete successfully
        """
        pass
874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903

    @pytest.mark.tags(CaseLabel.L3)
    def test_delete_sealed_only(self):
        """
        target: test delete sealed-only
        method: 1.deploy sealed-only: two dmlChannel and three queryNodes
                2.create and insert with flush
                3.load
                4.delete all data
                5.query
        expected:
        """
        # init collection and insert data without flush
        collection_w = self.init_collection_wrap(name=cf.gen_unique_str(prefix), shards_num=2)
        # insert 3000 entities into 3 segments
        segment_num = 3
        segment_per_count = 2000
        ids = []
        for i in range(segment_num):
            df = cf.gen_default_dataframe_data(nb=segment_per_count, start=(i * segment_per_count))
            res, _ = collection_w.insert(df)
            assert collection_w.num_entities == (i+1) * segment_per_count
            ids.extend(res.primary_keys)

        collection_w.load()

        expr = f'{ct.default_int64_field_name} in {ids}'
        collection_w.delete(expr)

        collection_w.query(expr, check_task=CheckTasks.check_query_empty)