action_after_upgrade.py 4.5 KB
Newer Older
1
from pymilvus import connections
2 3 4 5
import sys
sys.path.append("..")
sys.path.append("../..")
from common.milvus_sys import MilvusSys
6 7 8
from utils import *


9
def task_1(data_size, host):
10 11
    """
    task_1:
12
        before upgrade: create collection and insert data with flush, create index, load and search
13
        after upgrade: get collection, load, search, insert data with flush, release, create index, load, and search
14 15
    """
    prefix = "task_1_"
16
    connections.connect(host=host, port=19530, timeout=60)
17 18
    col_list = get_collections(prefix, check=True)
    assert len(col_list) == len(all_index_types)
19
    create_index(prefix)
20
    load_and_search(prefix)
21
    create_collections_and_insert_data(prefix, data_size)
22
    release_collection(prefix)
23 24
    create_index(prefix)
    load_and_search(prefix)
25 26


27
def task_2(data_size, host):
28 29
    """
    task_2:
30
        before upgrade: create collection, insert data and create index, load and search
31
        after upgrade: get collection, load, search, insert data, release, create index, load, and search
32 33
    """
    prefix = "task_2_"
34
    connections.connect(host=host, port=19530, timeout=60)
35 36
    col_list = get_collections(prefix, check=True)
    assert len(col_list) == len(all_index_types)
37
    load_and_search(prefix)
38
    create_collections_and_insert_data(prefix, data_size)
39
    release_collection(prefix)
40 41
    create_index(prefix)
    load_and_search(prefix)
42

43

44 45 46 47
def task_3(data_size, host):
    """
    task_3:
        before upgrade: create collection, insert data, flush, create index, load with one replicas and search
48
        after upgrade: get collection, load, search, insert data, release, create index, load with multi replicas, and search
49 50 51
    """
    prefix = "task_3_"
    connections.connect(host=host, port=19530, timeout=60)
52 53
    col_list = get_collections(prefix, check=True)
    assert len(col_list) == len(all_index_types)
54 55
    load_and_search(prefix)
    create_collections_and_insert_data(prefix, count=data_size)
56
    release_collection(prefix)
57 58 59 60 61 62 63
    create_index(prefix)
    load_and_search(prefix, replicas=NUM_REPLICAS)


def task_4(data_size, host):
    """
    task_4:
64 65
        before upgrade: create collection, insert data, flush, and create index
        after upgrade: get collection, load with multi replicas, search, insert data, load with multi replicas and search
66 67 68
    """
    prefix = "task_4_"
    connections.connect(host=host, port=19530, timeout=60)
69 70
    col_list = get_collections(prefix, check=True)
    assert len(col_list) == len(all_index_types)
71 72 73 74 75 76 77 78 79
    load_and_search(prefix, replicas=NUM_REPLICAS)
    create_collections_and_insert_data(prefix, flush=False, count=data_size)
    load_and_search(prefix, replicas=NUM_REPLICAS)


def task_5(data_size, host):
    """
    task_5_:
        before upgrade: create collection and insert data without flush
80
        after upgrade: get collection, create index, load with multi replicas, search, insert data with flush, load with multi replicas and search
81 82 83
    """
    prefix = "task_5_"
    connections.connect(host=host, port=19530, timeout=60)
84 85
    col_list = get_collections(prefix, check=True)
    assert len(col_list) == len(all_index_types)
86
    create_index(prefix)
87 88 89 90 91
    load_and_search(prefix, replicas=NUM_REPLICAS)
    create_collections_and_insert_data(prefix, flush=True, count=data_size)
    load_and_search(prefix, replicas=NUM_REPLICAS)


92
if __name__ == '__main__':
93
    import argparse
94
    import threading
95
    parser = argparse.ArgumentParser(description='config for deploy test')
96
    parser.add_argument('--host', type=str, default="127.0.0.1", help='milvus server ip')
97 98 99
    parser.add_argument('--data_size', type=int, default=3000, help='data size')
    args = parser.parse_args()
    data_size = args.data_size
100
    host = args.host
101
    logger.info(f"data size: {data_size}")
102 103
    connections.connect(host=host, port=19530, timeout=60)
    ms = MilvusSys()
104
    # create index for flat
105
    logger.info("create index for flat start")
106
    create_index_flat()
107 108 109 110
    logger.info("create index for flat done")
    tasks = []
    tasks.append(threading.Thread(target=task_1, args=(data_size, host)))
    tasks.append(threading.Thread(target=task_2, args=(data_size, host)))
111
    if len(ms.query_nodes) >= NUM_REPLICAS:
112 113 114 115 116 117 118
        tasks.append(threading.Thread(target=task_3, args=(data_size, host)))
        tasks.append(threading.Thread(target=task_4, args=(data_size, host)))
        tasks.append(threading.Thread(target=task_5, args=(data_size, host)))
    for task in tasks:
        task.start()
    for task in tasks:
        task.join()