From 658ba5c66ae84bf41a386406f4597b048033bac2 Mon Sep 17 00:00:00 2001 From: dingbo Date: Mon, 11 Jul 2022 19:36:28 +0800 Subject: [PATCH] docs: document for python example --- docs/examples/python/highvolume_example.py | 39 ++++++++++++++-------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/docs/examples/python/highvolume_example.py b/docs/examples/python/highvolume_example.py index 0255090e24..00d5935965 100644 --- a/docs/examples/python/highvolume_example.py +++ b/docs/examples/python/highvolume_example.py @@ -1,13 +1,12 @@ import logging import sys -import threading import time from multiprocessing import Queue, Process from queue import Empty +from typing import List import taos from taos import TaosConnection -from typing import List logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") @@ -25,6 +24,7 @@ def get_connection(): return taos.connect(host="localhost", user="root", password="taosdata", port=6030) +# ANCHOR: DataBaseMonitor class DataBaseMonitor: def __init__(self): self.conn: TaosConnection = get_connection() @@ -55,10 +55,13 @@ class DataBaseMonitor: logging.info(f"count={count} speed={(count - last_count) / 10}") last_count = count except KeyboardInterrupt: - [p.kill() for p in read_processes] - [p.kill for p in write_processes] + [p.terminate() for p in read_processes] + [p.terminate() for p in write_processes] + +# ANCHOR_END: DataBaseMonitor +# ANCHOR: MockDataSource class MockDataSource: location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] current = [8.8, 10.7, 9.9, 8.9, 9.4] @@ -91,6 +94,9 @@ class MockDataSource: raise StopIteration +# ANCHOR_END: MockDataSource + +# ANCHOR: SQLWriter class SQLWriter: log = logging.getLogger("SQLWriter") @@ -170,19 +176,21 @@ class SQLWriter: return self._buffered_count -def run_read_task(task_id: int, task_queues: List[Queue]): - log = logging.getLogger(f"ReadTask-{task_id}") +# ANCHOR_END: SQLWriter +# ANCHOR: read +def run_read_task(task_id: int, task_queues: List[Queue]): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT data_source = MockDataSource(f"tb{task_id}", table_count_per_task) - try: - for table_id, line in data_source: - i = table_id % len(task_queues) - task_queues[i].put(line, block=True) - except KeyboardInterrupt: - pass + for table_id, line in data_source: + i = table_id % len(task_queues) + task_queues[i].put(line, block=True) + + +# ANCHOR_END: read +# ANCHOR: write def run_write_task(task_id: int, queue: Queue): log = logging.getLogger(f"WriteTask-{task_id}") writer = SQLWriter() @@ -196,14 +204,14 @@ def run_write_task(task_id: int, queue: Queue): writer.flush() else: time.sleep(0.01) - except KeyboardInterrupt: - pass except BaseException as e: msg = f"line={line}, buffer_count={writer.buffered_count}" log.debug(msg) raise e +# ANCHOR_END: write + def set_global_config(): argc = len(sys.argv) if argc > 1: @@ -220,6 +228,7 @@ def set_global_config(): MAX_BATCH_SIZE = sys.argv[4] +# ANCHOR: main def main(): set_global_config() logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") @@ -248,5 +257,7 @@ def main(): database_monitor.stat_and_print() +# ANCHOR_END: main + if __name__ == '__main__': main() -- GitLab