From 6e4d3cf4261286bca330eb7034c9deb8a96383af Mon Sep 17 00:00:00 2001 From: Bo Ding Date: Tue, 12 Jul 2022 21:31:32 +0800 Subject: [PATCH] docs: test highvolume_faster_queue.py --- .../python/highvolume_faster_queue.py | 140 +++---- docs/examples/python/highvolume_mp_queue.py | 386 +++++++++--------- docs/examples/python/sql_writer.py | 46 +-- 3 files changed, 275 insertions(+), 297 deletions(-) diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py index 486f3c4865..b45cc0918f 100644 --- a/docs/examples/python/highvolume_faster_queue.py +++ b/docs/examples/python/highvolume_faster_queue.py @@ -1,11 +1,13 @@ # install dependencies: -# python >= 3.8 +# recommend python >= 3.8 # pip3 install faster-fifo # + import logging import sys import time from multiprocessing import Process +from faster_fifo import Queue from queue import Empty from typing import List @@ -13,60 +15,14 @@ logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s READ_TASK_COUNT = 1 WRITE_TASK_COUNT = 1 -QUEUE_SIZE = 1000 TABLE_COUNT = 1000 +QUEUE_SIZE = 1000000 MAX_BATCH_SIZE = 3000 read_processes = [] write_processes = [] -# ANCHOR: DataBaseMonitor -class DataBaseMonitor: - """ - Start a thread. - Prepare database and stable. - Statistic writing speed and print it every 10 seconds. - """ - - def __init__(self): - self.process = Process(target=self.run) - self.process.start() - - def get_connection(self): - import taos - return taos.connect(host="localhost", user="root", password="taosdata", port=6030) - - def prepare_database(self, conn): - conn.execute("DROP DATABASE IF EXISTS test") - conn.execute("CREATE DATABASE test") - conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") - - def get_count(self, conn): - res = conn.query("SELECT count(*) FROM test.meters") - rows = res.fetch_all() - return rows[0][0] if rows else 0 - - def run(self): - log = logging.getLogger("DataBaseMonitor") - conn = self.get_connection() - self.prepare_database(conn) - last_count = 0 - while True: - time.sleep(10) - count = self.get_count(conn) - log.info(f"count={count} speed={(count - last_count) / 10}") - last_count = count - - def join(self): - self.process.join() - - def stop(self): - self.process.terminate() - - -# ANCHOR_END: DataBaseMonitor - # ANCHOR: MockDataSource class MockDataSource: location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] @@ -86,18 +42,25 @@ class MockDataSource: return self def __next__(self): + """ + next 100 rows of current table + """ self.table_id += 1 if self.table_id == self.table_count: self.table_id = 0 + if self.row >= self.max_rows_per_table: + raise StopIteration + rows = [] + + while len(rows) < 100: self.row += 1 - if self.row < self.max_rows_per_table: ts = self.start_ms + 100 * self.row group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 tb_name = self.table_name_prefix + '_' + str(self.table_id) ri = self.row % 5 - return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}" - else: - raise StopIteration + rows.append( + f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}") + return self.table_id, rows # ANCHOR_END: MockDataSource @@ -107,9 +70,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT data_source = MockDataSource(f"tb{task_id}", table_count_per_task) try: - for table_id, line in data_source: + for table_id, rows in data_source: i = table_id % len(task_queues) - task_queues[i].put(line, block=True) + task_queues[i].put_many(rows, block=True, timeout=-1) except KeyboardInterrupt: pass @@ -120,27 +83,23 @@ def run_read_task(task_id: int, task_queues: List[Queue]): def run_write_task(task_id: int, queue: Queue): from sql_writer import SQLWriter log = logging.getLogger(f"WriteTask-{task_id}") - writer = SQLWriter(MAX_BATCH_SIZE) + writer = SQLWriter() + lines = None try: while True: try: - line = queue.get(block=False) - writer.process_line(line) + lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE) + writer.process_lines(lines) except Empty: - if writer.buffered_count > 0: - writer.flush() - else: - time.sleep(0.01) + time.sleep(0.01) except KeyboardInterrupt: pass except BaseException as e: - msg = f"line={line}, buffer_count={writer.buffered_count}" - log.debug(msg) + log.debug(f"lines={lines}") raise e # ANCHOR_END: write - def set_global_config(): argc = len(sys.argv) if argc > 1: @@ -150,34 +109,60 @@ def set_global_config(): global WRITE_TASK_COUNT WRITE_TASK_COUNT = int(sys.argv[2]) if argc > 3: - global QUEUE_SIZE - QUEUE_SIZE = int(sys.argv[3]) - if argc > 4: global TABLE_COUNT - TABLE_COUNT = int(sys.argv[4]) + TABLE_COUNT = int(sys.argv[3]) + if argc > 4: + global QUEUE_SIZE + QUEUE_SIZE = int(sys.argv[4]) if argc > 5: global MAX_BATCH_SIZE MAX_BATCH_SIZE = int(sys.argv[5]) # ANCHOR: main +def run_monitor_process(): + import taos + log = logging.getLogger("DataBaseMonitor") + conn = taos.connect(host="localhost", user="root", password="taosdata", port=6030) + conn.execute("DROP DATABASE IF EXISTS test") + conn.execute("CREATE DATABASE test") + conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + "TAGS (location BINARY(64), groupId INT)") + + def get_count(): + res = conn.query("SELECT count(*) FROM test.meters") + rows = res.fetch_all() + return rows[0][0] if rows else 0 + + last_count = 0 + while True: + time.sleep(10) + count = get_count() + log.info(f"count={count} speed={(count - last_count) / 10}") + last_count = count + + def main(): set_global_config() - logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") + logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, " + f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") - database_monitor = DataBaseMonitor() - time.sleep(3) # wait for database ready + monitor_process = Process(target=run_monitor_process) + monitor_process.start() + time.sleep(3) task_queues: List[Queue] = [] - + # create task queues for i in range(WRITE_TASK_COUNT): - queue = Queue(maxsize=QUEUE_SIZE) + queue = Queue(max_size_bytes=QUEUE_SIZE) task_queues.append(queue) - p = Process(target=run_write_task, args=(i, queue)) + # create write processes + for i in range(WRITE_TASK_COUNT): + p = Process(target=run_write_task, args=(i, task_queues[i])) p.start() logging.debug(f"WriteTask-{i} started with pid {p.pid}") write_processes.append(p) - + # create read processes for i in range(READ_TASK_COUNT): p = Process(target=run_read_task, args=(i, task_queues)) p.start() @@ -185,11 +170,12 @@ def main(): read_processes.append(p) try: - database_monitor.join() + monitor_process.join() except KeyboardInterrupt: - database_monitor.stop() + monitor_process.terminate() [p.terminate() for p in read_processes] [p.terminate() for p in write_processes] + [q.close() for q in task_queues] if __name__ == '__main__': diff --git a/docs/examples/python/highvolume_mp_queue.py b/docs/examples/python/highvolume_mp_queue.py index c3e41e9467..f6605adb0f 100644 --- a/docs/examples/python/highvolume_mp_queue.py +++ b/docs/examples/python/highvolume_mp_queue.py @@ -1,193 +1,193 @@ -import logging -import sys -import time -from multiprocessing import Queue, Process -from queue import Empty -from typing import List - -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") - -READ_TASK_COUNT = 1 -WRITE_TASK_COUNT = 1 -QUEUE_SIZE = 1000 -TABLE_COUNT = 1000 -MAX_BATCH_SIZE = 3000 - -read_processes = [] -write_processes = [] - - -# ANCHOR: DataBaseMonitor -class DataBaseMonitor: - """ - Start a thread. - Prepare database and stable. - Statistic writing speed and print it every 10 seconds. - """ - - def __init__(self): - self.process = Process(target=self.run) - self.process.start() - - def get_connection(self): - import taos - return taos.connect(host="localhost", user="root", password="taosdata", port=6030) - - def prepare_database(self, conn): - conn.execute("DROP DATABASE IF EXISTS test") - conn.execute("CREATE DATABASE test") - conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") - - def get_count(self, conn): - res = conn.query("SELECT count(*) FROM test.meters") - rows = res.fetch_all() - return rows[0][0] if rows else 0 - - def run(self): - log = logging.getLogger("DataBaseMonitor") - conn = self.get_connection() - self.prepare_database(conn) - last_count = 0 - while True: - time.sleep(10) - count = self.get_count(conn) - log.info(f"count={count} speed={(count - last_count) / 10}") - last_count = count - - def join(self): - self.process.join() - - def stop(self): - self.process.terminate() - - -# ANCHOR_END: DataBaseMonitor - -# ANCHOR: MockDataSource -class MockDataSource: - location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] - current = [8.8, 10.7, 9.9, 8.9, 9.4] - voltage = [119, 116, 111, 113, 118] - phase = [0.32, 0.34, 0.33, 0.329, 0.141] - max_rows_per_table = 10 ** 9 - - def __init__(self, tb_name_prefix, table_count): - self.table_name_prefix = tb_name_prefix - self.table_count = table_count - self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100 - - def __iter__(self): - self.row = 0 - self.table_id = -1 - return self - - def __next__(self): - self.table_id += 1 - if self.table_id == self.table_count: - self.table_id = 0 - self.row += 1 - if self.row < self.max_rows_per_table: - ts = self.start_ms + 100 * self.row - group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 - tb_name = self.table_name_prefix + '_' + str(self.table_id) - ri = self.row % 5 - return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}" - else: - raise StopIteration - - -# ANCHOR_END: MockDataSource - -# ANCHOR: read -def run_read_task(task_id: int, task_queues: List[Queue]): - table_count_per_task = TABLE_COUNT // READ_TASK_COUNT - data_source = MockDataSource(f"tb{task_id}", table_count_per_task) - try: - for table_id, line in data_source: - i = table_id % len(task_queues) - task_queues[i].put(line, block=True) - except KeyboardInterrupt: - pass - - -# ANCHOR_END: read - -# ANCHOR: write -def run_write_task(task_id: int, queue: Queue): - from sql_writer import SQLWriter - log = logging.getLogger(f"WriteTask-{task_id}") - writer = SQLWriter(MAX_BATCH_SIZE) - try: - while True: - try: - line = queue.get(block=False) - writer.process_line(line) - except Empty: - if writer.buffered_count > 0: - writer.flush() - else: - time.sleep(0.01) - except KeyboardInterrupt: - pass - except BaseException as e: - msg = f"line={line}, buffer_count={writer.buffered_count}" - log.debug(msg) - raise e - - -# ANCHOR_END: write - -def set_global_config(): - argc = len(sys.argv) - if argc > 1: - global READ_TASK_COUNT - READ_TASK_COUNT = int(sys.argv[1]) - if argc > 2: - global WRITE_TASK_COUNT - WRITE_TASK_COUNT = int(sys.argv[2]) - if argc > 3: - global QUEUE_SIZE - QUEUE_SIZE = int(sys.argv[3]) - if argc > 4: - global TABLE_COUNT - TABLE_COUNT = int(sys.argv[4]) - if argc > 5: - global MAX_BATCH_SIZE - MAX_BATCH_SIZE = int(sys.argv[5]) - - -# ANCHOR: main -def main(): - set_global_config() - logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") - - database_monitor = DataBaseMonitor() - time.sleep(3) # wait for database ready - - task_queues: List[Queue] = [] - - for i in range(WRITE_TASK_COUNT): - queue = Queue(maxsize=QUEUE_SIZE) - task_queues.append(queue) - p = Process(target=run_write_task, args=(i, queue)) - p.start() - logging.debug(f"WriteTask-{i} started with pid {p.pid}") - write_processes.append(p) - - for i in range(READ_TASK_COUNT): - p = Process(target=run_read_task, args=(i, task_queues)) - p.start() - logging.debug(f"ReadTask-{i} started with pid {p.pid}") - read_processes.append(p) - - try: - database_monitor.join() - except KeyboardInterrupt: - database_monitor.stop() - [p.terminate() for p in read_processes] - [p.terminate() for p in write_processes] - - -if __name__ == '__main__': - main() -# ANCHOR_END: main +# import logging +# import sys +# import time +# from multiprocessing import Queue, Process +# from queue import Empty +# from typing import List +# +# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") +# +# READ_TASK_COUNT = 1 +# WRITE_TASK_COUNT = 1 +# QUEUE_SIZE = 1000 +# TABLE_COUNT = 1000 +# MAX_BATCH_SIZE = 3000 +# +# read_processes = [] +# write_processes = [] +# +# +# # ANCHOR: DataBaseMonitor +# class DataBaseMonitor: +# """ +# Start a thread. +# Prepare database and stable. +# Statistic writing speed and print it every 10 seconds. +# """ +# +# def __init__(self): +# self.process = Process(target=self.run) +# self.process.start() +# +# def get_connection(self): +# import taos +# return taos.connect(host="localhost", user="root", password="taosdata", port=6030) +# +# def prepare_database(self, conn): +# conn.execute("DROP DATABASE IF EXISTS test") +# conn.execute("CREATE DATABASE test") +# conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") +# +# def get_count(self, conn): +# res = conn.query("SELECT count(*) FROM test.meters") +# rows = res.fetch_all() +# return rows[0][0] if rows else 0 +# +# def run(self): +# log = logging.getLogger("DataBaseMonitor") +# conn = self.get_connection() +# self.prepare_database(conn) +# last_count = 0 +# while True: +# time.sleep(10) +# count = self.get_count(conn) +# log.info(f"count={count} speed={(count - last_count) / 10}") +# last_count = count +# +# def join(self): +# self.process.join() +# +# def stop(self): +# self.process.terminate() +# +# +# # ANCHOR_END: DataBaseMonitor +# +# # ANCHOR: MockDataSource +# class MockDataSource: +# location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] +# current = [8.8, 10.7, 9.9, 8.9, 9.4] +# voltage = [119, 116, 111, 113, 118] +# phase = [0.32, 0.34, 0.33, 0.329, 0.141] +# max_rows_per_table = 10 ** 9 +# +# def __init__(self, tb_name_prefix, table_count): +# self.table_name_prefix = tb_name_prefix +# self.table_count = table_count +# self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100 +# +# def __iter__(self): +# self.row = 0 +# self.table_id = -1 +# return self +# +# def __next__(self): +# self.table_id += 1 +# if self.table_id == self.table_count: +# self.table_id = 0 +# self.row += 1 +# if self.row < self.max_rows_per_table: +# ts = self.start_ms + 100 * self.row +# group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 +# tb_name = self.table_name_prefix + '_' + str(self.table_id) +# ri = self.row % 5 +# return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}" +# else: +# raise StopIteration +# +# +# # ANCHOR_END: MockDataSource +# +# # ANCHOR: read +# def run_read_task(task_id: int, task_queues: List[Queue]): +# table_count_per_task = TABLE_COUNT // READ_TASK_COUNT +# data_source = MockDataSource(f"tb{task_id}", table_count_per_task) +# try: +# for table_id, line in data_source: +# i = table_id % len(task_queues) +# task_queues[i].put(line, block=True) +# except KeyboardInterrupt: +# pass +# +# +# # ANCHOR_END: read +# +# # ANCHOR: write +# def run_write_task(task_id: int, queue: Queue): +# from sql_writer import SQLWriter +# log = logging.getLogger(f"WriteTask-{task_id}") +# writer = SQLWriter(MAX_BATCH_SIZE) +# try: +# while True: +# try: +# line = queue.get(block=False) +# writer.process_line(line) +# except Empty: +# if writer.buffered_count > 0: +# writer.flush() +# else: +# time.sleep(0.01) +# except KeyboardInterrupt: +# pass +# except BaseException as e: +# msg = f"line={line}, buffer_count={writer.buffered_count}" +# log.debug(msg) +# raise e +# +# +# # ANCHOR_END: write +# +# def set_global_config(): +# argc = len(sys.argv) +# if argc > 1: +# global READ_TASK_COUNT +# READ_TASK_COUNT = int(sys.argv[1]) +# if argc > 2: +# global WRITE_TASK_COUNT +# WRITE_TASK_COUNT = int(sys.argv[2]) +# if argc > 3: +# global QUEUE_SIZE +# QUEUE_SIZE = int(sys.argv[3]) +# if argc > 4: +# global TABLE_COUNT +# TABLE_COUNT = int(sys.argv[4]) +# if argc > 5: +# global MAX_BATCH_SIZE +# MAX_BATCH_SIZE = int(sys.argv[5]) +# +# +# # ANCHOR: main +# def main(): +# set_global_config() +# logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") +# +# database_monitor = DataBaseMonitor() +# time.sleep(3) # wait for database ready +# +# task_queues: List[Queue] = [] +# +# for i in range(WRITE_TASK_COUNT): +# queue = Queue(maxsize=QUEUE_SIZE) +# task_queues.append(queue) +# p = Process(target=run_write_task, args=(i, queue)) +# p.start() +# logging.debug(f"WriteTask-{i} started with pid {p.pid}") +# write_processes.append(p) +# +# for i in range(READ_TASK_COUNT): +# p = Process(target=run_read_task, args=(i, task_queues)) +# p.start() +# logging.debug(f"ReadTask-{i} started with pid {p.pid}") +# read_processes.append(p) +# +# try: +# database_monitor.join() +# except KeyboardInterrupt: +# database_monitor.stop() +# [p.terminate() for p in read_processes] +# [p.terminate() for p in write_processes] +# +# +# if __name__ == '__main__': +# main() +# # ANCHOR_END: main diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py index 77a1e32648..5c36cf1e6f 100644 --- a/docs/examples/python/sql_writer.py +++ b/docs/examples/python/sql_writer.py @@ -5,9 +5,7 @@ import taos class SQLWriter: log = logging.getLogger("SQLWriter") - def __init__(self, max_batch_size): - self._buffered_count = 0 - self._max_batch_size = max_batch_size + def __init__(self): self._tb_values = {} self._tb_tags = {} self._conn = self.get_connection() @@ -21,30 +19,29 @@ class SQLWriter: if name == "maxSQLLength": return int(r[1]) - def get_connection(self): + @staticmethod + def get_connection(): return taos.connect(host="localhost", user="root", password="taosdata", port=6030) - def process_line(self, line: str): + def process_lines(self, lines: str): """ - :param line: tbName,ts,current,voltage,phase,location,groupId + :param lines: [[tbName,ts,current,voltage,phase,location,groupId]] """ - self._buffered_count += 1 - ps = line.split(",") - table_name = ps[0] - value = '(' + ",".join(ps[1:-2]) + ') ' - if table_name in self._tb_values: - self._tb_values[table_name] += value - else: - self._tb_values[table_name] = value - - if table_name not in self._tb_tags: - location = ps[-2] - group_id = ps[-1] - tag_value = f"('{location}',{group_id})" - self._tb_tags[table_name] = tag_value + for line in lines: + ps = line.split(",") + table_name = ps[0] + value = '(' + ",".join(ps[1:-2]) + ') ' + if table_name in self._tb_values: + self._tb_values[table_name] += value + else: + self._tb_values[table_name] = value - if self._buffered_count == self._max_batch_size: - self.flush() + if table_name not in self._tb_tags: + location = ps[-2] + group_id = ps[-1] + tag_value = f"('{location}',{group_id})" + self._tb_tags[table_name] = tag_value + self.flush() def flush(self): """ @@ -68,7 +65,6 @@ class SQLWriter: sql += " ".join(buf) self.execute_sql(sql) self._tb_values.clear() - self._buffered_count = 0 def execute_sql(self, sql): try: @@ -87,7 +83,3 @@ class SQLWriter: tag_values = self._tb_tags[tb] sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " self._conn.execute(sql) - - @property - def buffered_count(self): - return self._buffered_count -- GitLab