fast_write_example.py 6.5 KB
Newer Older
1 2 3 4 5 6
# install dependencies:
# recommend python >= 3.8
#

import logging
import math
S
sunpeng 已提交
7
import multiprocessing
8 9 10
import sys
import time
import os
S
sunpeng 已提交
11
from multiprocessing import Process, Queue
12 13 14 15 16 17 18 19 20 21 22 23
from mockdatasource import MockDataSource
from queue import Empty
from typing import List

logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")

READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 1
TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000

S
sunpeng 已提交
24
_DONE_MESSAGE = '__DONE__'
25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44


def get_connection():
    """
    If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
    You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
    """
    import taos
    firstEP = os.environ.get("TDENGINE_FIRST_EP")
    if firstEP:
        host, port = firstEP.split(":")
    else:
        host, port = None, 0
    user = os.environ.get("TDENGINE_USER", "root")
    password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
    return taos.connect(host=host, port=int(port), user=user, password=password)


# ANCHOR: read

S
sunpeng 已提交
45
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
46
    table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
S
sunpeng 已提交
47
    data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
48 49
    try:
        for batch in data_source:
S
sunpeng 已提交
50 51
            if isinstance(batch, tuple):
                batch = [batch]
52 53 54 55
            for table_id, rows in batch:
                # hash data to different queue
                i = table_id % len(task_queues)
                # block putting forever when the queue is full
S
sunpeng 已提交
56 57 58 59 60
                for row in rows:
                    task_queues[i].put(row)
        if not infinity:
            for queue in task_queues:
                queue.put(_DONE_MESSAGE)
61 62
    except KeyboardInterrupt:
        pass
S
sunpeng 已提交
63 64
    finally:
        logging.info('read task over')
65 66 67 68


# ANCHOR_END: read

S
sunpeng 已提交
69

70
# ANCHOR: write
S
sunpeng 已提交
71
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
72 73 74 75 76 77
    from sql_writer import SQLWriter
    log = logging.getLogger(f"WriteTask-{task_id}")
    writer = SQLWriter(get_connection)
    lines = None
    try:
        while True:
S
sunpeng 已提交
78 79 80 81 82 83 84 85 86 87 88 89 90
            over = False
            lines = []
            for _ in range(MAX_BATCH_SIZE):
                try:
                    line = queue.get_nowait()
                    if line == _DONE_MESSAGE:
                        over = True
                        break
                    if line:
                        lines.append(line)
                except Empty:
                    time.sleep(0.1)
            if len(lines) > 0:
91
                writer.process_lines(lines)
S
sunpeng 已提交
92 93 94
            if over:
                done_queue.put(_DONE_MESSAGE)
                break
95 96 97 98 99
    except KeyboardInterrupt:
        pass
    except BaseException as e:
        log.debug(f"lines={lines}")
        raise e
S
sunpeng 已提交
100 101 102
    finally:
        writer.close()
        log.debug('write task over')
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126


# ANCHOR_END: write

def set_global_config():
    argc = len(sys.argv)
    if argc > 1:
        global READ_TASK_COUNT
        READ_TASK_COUNT = int(sys.argv[1])
    if argc > 2:
        global WRITE_TASK_COUNT
        WRITE_TASK_COUNT = int(sys.argv[2])
    if argc > 3:
        global TABLE_COUNT
        TABLE_COUNT = int(sys.argv[3])
    if argc > 4:
        global QUEUE_SIZE
        QUEUE_SIZE = int(sys.argv[4])
    if argc > 5:
        global MAX_BATCH_SIZE
        MAX_BATCH_SIZE = int(sys.argv[5])


# ANCHOR: monitor
S
sunpeng 已提交
127
def run_monitor_process(done_queue: Queue):
128
    log = logging.getLogger("DataBaseMonitor")
S
sunpeng 已提交
129 130 131
    conn = None
    try:
        conn = get_connection()
132

S
sunpeng 已提交
133 134 135 136
        def get_count():
            res = conn.query("SELECT count(*) FROM test.meters")
            rows = res.fetch_all()
            return rows[0][0] if rows else 0
137

S
sunpeng 已提交
138 139 140 141 142 143 144 145 146 147 148 149 150 151
        last_count = 0
        while True:
            try:
                done = done_queue.get_nowait()
                if done == _DONE_MESSAGE:
                    break
            except Empty:
                pass
            time.sleep(10)
            count = get_count()
            log.info(f"count={count} speed={(count - last_count) / 10}")
            last_count = count
    finally:
        conn.close()
152 153 154 155


# ANCHOR_END: monitor
# ANCHOR: main
S
sunpeng 已提交
156
def main(infinity):
157 158 159 160
    set_global_config()
    logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
                 f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")

S
sunpeng 已提交
161 162 163 164 165 166 167 168 169
    conn = get_connection()
    conn.execute("DROP DATABASE IF EXISTS test")
    conn.execute("CREATE DATABASE IF NOT EXISTS test")
    conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
                 "TAGS (location BINARY(64), groupId INT)")
    conn.close()

    done_queue = Queue()
    monitor_process = Process(target=run_monitor_process, args=(done_queue,))
170
    monitor_process.start()
S
sunpeng 已提交
171
    logging.debug(f"monitor task started with pid {monitor_process.pid}")
172 173

    task_queues: List[Queue] = []
S
sunpeng 已提交
174 175 176
    write_processes = []
    read_processes = []

177 178
    # create task queues
    for i in range(WRITE_TASK_COUNT):
S
sunpeng 已提交
179
        queue = Queue()
180 181 182 183
        task_queues.append(queue)

    # create write processes
    for i in range(WRITE_TASK_COUNT):
S
sunpeng 已提交
184
        p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
185 186 187 188 189 190 191
        p.start()
        logging.debug(f"WriteTask-{i} started with pid {p.pid}")
        write_processes.append(p)

    # create read processes
    for i in range(READ_TASK_COUNT):
        queues = assign_queues(i, task_queues)
S
sunpeng 已提交
192
        p = Process(target=run_read_task, args=(i, queues, infinity))
193 194 195 196 197 198
        p.start()
        logging.debug(f"ReadTask-{i} started with pid {p.pid}")
        read_processes.append(p)

    try:
        monitor_process.join()
S
sunpeng 已提交
199 200 201 202 203 204
        for p in read_processes:
            p.join()
        for p in write_processes:
            p.join()
        time.sleep(1)
        return
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
    except KeyboardInterrupt:
        monitor_process.terminate()
        [p.terminate() for p in read_processes]
        [p.terminate() for p in write_processes]
        [q.close() for q in task_queues]


def assign_queues(read_task_id, task_queues):
    """
    Compute target queues for a specific read task.
    """
    ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
    from_index = math.floor(read_task_id * ratio)
    end_index = math.ceil((read_task_id + 1) * ratio)
    return task_queues[from_index:end_index]


if __name__ == '__main__':
S
sunpeng 已提交
223 224
    multiprocessing.set_start_method('spawn')
    main(False)
225
# ANCHOR_END: main