test_wal.py 4.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
import time
import pdb
import threading
import logging
from multiprocessing import Pool, Process
import pytest
from milvus import IndexType, MetricType
from utils import *

dim = 128
X
Xiaohai Xu 已提交
11
collection_id = "test_wal"
12 13
WAL_TIMEOUT = 30
nb = 6000
14
add_interval = 1.5
15 16 17 18 19 20 21 22 23


class TestWalBase:
    """
    ******************************************************************
      The following cases are used to test WAL functionality
    ******************************************************************
    """
    @pytest.mark.timeout(WAL_TIMEOUT)
D
del-zhenwu 已提交
24
    def test_wal_insert(self, connect, collection):
25 26 27 28 29 30
        '''
        target: add vectors in WAL
        method: add vectors and flush when WAL is enabled
        expected: status ok, vectors added
        '''
        vectors = gen_vector(nb, dim)
D
del-zhenwu 已提交
31
        status, ids = connect.insert(collection, vectors)
32
        assert status.OK()
X
Xiaohai Xu 已提交
33
        status = connect.flush([collection])
34
        assert status.OK()
D
del-zhenwu 已提交
35
        status, res = connect.count_entities(collection)
36 37
        assert status.OK()
        assert res == nb
D
del-zhenwu 已提交
38
        status, res = connect.get_entity_by_id(collection, [ids[0]]) 
39 40
        logging.getLogger().info(res)
        assert status.OK()
41
        assert_equal_vector(res[0], vectors[0])
42 43

    @pytest.mark.timeout(WAL_TIMEOUT)
X
Xiaohai Xu 已提交
44
    def test_wal_delete_vectors(self, connect, collection):
45 46 47 48 49 50
        '''
        target: delete vectors in WAL
        method: delete vectors and flush when WAL is enabled
        expected: status ok, vectors deleted
        '''
        vectors = gen_vector(nb, dim)
D
del-zhenwu 已提交
51
        status, ids = connect.insert(collection, vectors)
52
        assert status.OK()
X
Xiaohai Xu 已提交
53
        connect.flush([collection])
D
del-zhenwu 已提交
54
        status, res = connect.count_entities(collection)
55
        assert status.OK()
D
del-zhenwu 已提交
56
        status = connect.delete_entity_by_id(collection, ids)
57
        assert status.OK()
X
Xiaohai Xu 已提交
58
        status = connect.flush([collection])
59
        assert status.OK()
D
del-zhenwu 已提交
60
        status, res = connect.count_entities(collection)
61 62 63 64
        assert status.OK()
        assert res == 0

    @pytest.mark.timeout(WAL_TIMEOUT)
X
Xiaohai Xu 已提交
65
    def test_wal_invalid_operation(self, connect, collection):
66 67 68 69 70 71
        '''
        target: invalid operation in WAL
        method: add vectors, delete with non-existent ids and flush when WAL is enabled
        expected: status ok, search with vector have result
        '''
        vector = gen_single_vector(dim)
D
del-zhenwu 已提交
72
        status, ids = connect.insert(collection, vector)
73
        assert status.OK()
X
Xiaohai Xu 已提交
74
        connect.flush([collection])
D
del-zhenwu 已提交
75
        status = connect.delete_entity_by_id(collection, [0])
76
        assert status.OK()
X
Xiaohai Xu 已提交
77
        status = connect.flush([collection])
D
del-zhenwu 已提交
78
        status, res = connect.count_entities(collection)
79 80 81 82
        assert status.OK()
        assert res == 1

    @pytest.mark.timeout(WAL_TIMEOUT)
X
Xiaohai Xu 已提交
83
    def test_wal_invalid_operation_B(self, connect, collection):
84 85
        '''
        target: invalid operation in WAL
X
Xiaohai Xu 已提交
86
        method: add vectors, delete with not existed collection name when WAL is enabled
87 88 89
        expected: status not ok
        '''
        vectors = gen_vector(nb, dim)
D
del-zhenwu 已提交
90
        status, ids = connect.insert(collection, vectors)
91
        assert status.OK()
X
Xiaohai Xu 已提交
92
        status = connect.flush([collection])
D
del-zhenwu 已提交
93
        status = connect.delete_entity_by_id(collection, [0])
X
Xiaohai Xu 已提交
94 95
        connect.flush([collection])
        collection_new = gen_unique_str()
D
del-zhenwu 已提交
96
        status = connect.delete_entity_by_id(collection_new, ids)
97
        assert not status.OK()
X
Xiaohai Xu 已提交
98
        status = connect.flush([collection])
99
        assert status.OK()
D
del-zhenwu 已提交
100
        status, res = connect.count_entities(collection)
101 102 103 104
        assert status.OK()
        assert res == nb

    @pytest.mark.timeout(WAL_TIMEOUT)
X
Xiaohai Xu 已提交
105
    def test_wal_server_crashed_recovery(self, connect, collection):
106 107 108 109 110 111
        '''
        target: test wal when server crashed unexpectedly and restarted
        method: add vectors, server killed before flush, restarted server and flush
        expected: status ok, add request is recovered and vectors added
        '''
        vector = gen_single_vector(dim)
D
del-zhenwu 已提交
112
        status, ids = connect.insert(collection, vector)
113
        assert status.OK()
X
Xiaohai Xu 已提交
114
        status = connect.flush([collection])
D
del-zhenwu 已提交
115
        status, res = connect.count_entities(collection)
116 117 118 119
        assert status.OK()
        logging.getLogger().info(res) # should be 0 because no auto flush
        logging.getLogger().info("Stop server and restart")
        # kill server and restart. auto flush should be set to 15 seconds.
120
        # time.sleep(15)
X
Xiaohai Xu 已提交
121
        status = connect.flush([collection])
122
        assert status.OK()
D
del-zhenwu 已提交
123
        status, res = connect.count_entities(collection)
124 125
        assert status.OK()
        assert res == 1
D
del-zhenwu 已提交
126
        status, res = connect.get_entity_by_id(collection, [ids[0]]) 
127 128
        logging.getLogger().info(res)
        assert status.OK()
129
        assert_equal_vector(res[0], vector[0])