From 3bbb38c95516e71e9cbfc6e29762a348476c27f9 Mon Sep 17 00:00:00 2001 From: Bo Ding Date: Tue, 12 Jul 2022 13:15:22 +0800 Subject: [PATCH] docs: test hihvolume_example.py --- docs/examples/python/highvolume_example.py | 73 +++++++++++++--------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/docs/examples/python/highvolume_example.py b/docs/examples/python/highvolume_example.py index 6d4f9185ce..9b2f6e7e33 100644 --- a/docs/examples/python/highvolume_example.py +++ b/docs/examples/python/highvolume_example.py @@ -1,59 +1,71 @@ import logging import sys +from threading import Thread import time from multiprocessing import Queue, Process from queue import Empty from typing import List -import taos -from taos import TaosConnection logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") READ_TASK_COUNT = 1 -WRITE_TASK_COUNT = 3 +WRITE_TASK_COUNT = 1 TABLE_COUNT = 1000 MAX_BATCH_SIZE = 3000 -MAX_SQL_LENGTH = 1024 * 1024 read_processes = [] write_processes = [] def get_connection(): + import taos return taos.connect(host="localhost", user="root", password="taosdata", port=6030) +def get_max_sql_length(conn): + rows = conn.query("SHOW variables").fetch_all() + for r in rows: + name = r[0] + if name == "maxSQLLength": + return int(r[1]) # ANCHOR: DataBaseMonitor class DataBaseMonitor: + """ + Start a thread. + Prepare database and stable. + Statistic writing speed and print it every 10 seconds. + """ def __init__(self): - self.conn: TaosConnection = get_connection() + self.process = Process(target=self.run) + self.process.start() - def prepare_database(self): - self.conn.execute("DROP DATABASE IF EXISTS test") - self.conn.execute("CREATE DATABASE test") - self.conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") + def prepare_database(self, conn): + conn.execute("DROP DATABASE IF EXISTS test") + conn.execute("CREATE DATABASE test") + conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") - def get_count(self): - res = self.conn.query("SELECT count(*) FROM test.meters") + def get_count(self, conn): + res = conn.query("SELECT count(*) FROM test.meters") rows = res.fetch_all() return rows[0][0] if rows else 0 - def get_max_sql_length(self): - rows = self.conn.query("SHOW variables").fetch_all() - for r in rows: - name = r[0] - if name == "maxSQLLength": - return int(r[1]) - - def stat_and_print(self): + def run(self): + log = logging.getLogger("DataBaseMonitor") + conn = get_connection() + self.prepare_database(conn) last_count = 0 while True: time.sleep(10) - count = self.get_count() - logging.info(f"count={count} speed={(count - last_count) / 10}") + count = self.get_count(conn) + log.info(f"count={count} speed={(count - last_count) / 10}") last_count = count + def join(self): + self.process.join() + + def stop(self): + self.process.terminate() # ANCHOR_END: DataBaseMonitor @@ -101,8 +113,9 @@ class SQLWriter: self._tb_values = {} self._tb_tags = {} self._conn = get_connection() + self._max_sql_lenght = get_max_sql_length(self._conn) self._conn.execute("USE test") - + def process_line(self, line: str): """ :param line: tbName,ts,current,voltage,phase,location,groupId @@ -136,7 +149,7 @@ class SQLWriter: buf = [] for tb_name, values in self._tb_values.items(): q = tb_name + " VALUES " + values - if sql_len + len(q) >= MAX_SQL_LENGTH: + if sql_len + len(q) >= self._max_sql_lenght: sql += " ".join(buf) self.execute_sql(sql) sql = "INSERT INTO " @@ -150,7 +163,7 @@ class SQLWriter: self._buffered_count = 0 def execute_sql(self, sql): - self.log.debug(sql) + import taos try: self._conn.execute(sql) except taos.Error as e: @@ -235,10 +248,7 @@ def main(): logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") database_monitor = DataBaseMonitor() - database_monitor.prepare_database() - global MAX_SQL_LENGTH - MAX_SQL_LENGTH = database_monitor.get_max_sql_length() - logging.info(f"MAX_SQL_LENGTH={MAX_SQL_LENGTH}") + time.sleep(3) # wait for database ready task_queues: List[Queue] = [] @@ -247,20 +257,21 @@ def main(): task_queues.append(queue) p = Process(target=run_write_task, args=(i, queue)) p.start() - + logging.debug(f"WriteTask {i} started with pid {p.pid}") write_processes.append(p) for i in range(READ_TASK_COUNT): p = Process(target=run_read_task, args=(i, task_queues)) p.start() + logging.debug(f"ReadTask {i} started with pid {p.pid}") read_processes.append(p) try: - database_monitor.stat_and_print() + database_monitor.join() except KeyboardInterrupt: + database_monitor.stop() [p.terminate() for p in read_processes] [p.terminate() for p in write_processes] - exit() # ANCHOR_END: main -- GitLab