From 744df1ebec29037b8119531d358170776ed9e6e9 Mon Sep 17 00:00:00 2001 From: dingbo Date: Tue, 12 Jul 2022 13:40:40 +0800 Subject: [PATCH] docs: test highvolume_example.py --- docs/examples/python/highvolume_example.py | 121 +++------------------ docs/examples/python/sql_writer.py | 93 ++++++++++++++++ 2 files changed, 108 insertions(+), 106 deletions(-) create mode 100644 docs/examples/python/sql_writer.py diff --git a/docs/examples/python/highvolume_example.py b/docs/examples/python/highvolume_example.py index 9b2f6e7e33..6683db7e15 100644 --- a/docs/examples/python/highvolume_example.py +++ b/docs/examples/python/highvolume_example.py @@ -1,12 +1,10 @@ import logging import sys -from threading import Thread import time from multiprocessing import Queue, Process from queue import Empty from typing import List - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") READ_TASK_COUNT = 1 @@ -18,17 +16,6 @@ read_processes = [] write_processes = [] -def get_connection(): - import taos - return taos.connect(host="localhost", user="root", password="taosdata", port=6030) - -def get_max_sql_length(conn): - rows = conn.query("SHOW variables").fetch_all() - for r in rows: - name = r[0] - if name == "maxSQLLength": - return int(r[1]) - # ANCHOR: DataBaseMonitor class DataBaseMonitor: """ @@ -36,10 +23,15 @@ class DataBaseMonitor: Prepare database and stable. Statistic writing speed and print it every 10 seconds. """ + def __init__(self): self.process = Process(target=self.run) self.process.start() + def get_connection(self): + import taos + return taos.connect(host="localhost", user="root", password="taosdata", port=6030) + def prepare_database(self, conn): conn.execute("DROP DATABASE IF EXISTS test") conn.execute("CREATE DATABASE test") @@ -52,7 +44,7 @@ class DataBaseMonitor: def run(self): log = logging.getLogger("DataBaseMonitor") - conn = get_connection() + conn = self.get_connection() self.prepare_database(conn) last_count = 0 while True: @@ -67,6 +59,7 @@ class DataBaseMonitor: def stop(self): self.process.terminate() + # ANCHOR_END: DataBaseMonitor # ANCHOR: MockDataSource @@ -104,90 +97,6 @@ class MockDataSource: # ANCHOR_END: MockDataSource -# ANCHOR: SQLWriter -class SQLWriter: - log = logging.getLogger("SQLWriter") - - def __init__(self): - self._buffered_count = 0 - self._tb_values = {} - self._tb_tags = {} - self._conn = get_connection() - self._max_sql_lenght = get_max_sql_length(self._conn) - self._conn.execute("USE test") - - def process_line(self, line: str): - """ - :param line: tbName,ts,current,voltage,phase,location,groupId - """ - self._buffered_count += 1 - ps = line.split(",") - table_name = ps[0] - value = '(' + ",".join(ps[1:-2]) + ') ' - if table_name in self._tb_values: - self._tb_values[table_name] += value - else: - self._tb_values[table_name] = value - - if table_name not in self._tb_tags: - location = ps[-2] - group_id = ps[-1] - tag_value = f"('{location}',{group_id})" - self._tb_tags[table_name] = tag_value - - if self._buffered_count == MAX_BATCH_SIZE: - self.flush() - - def flush(self): - """ - Assemble INSERT statement and execute it. - When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created. - In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed. - """ - sql = "INSERT INTO " - sql_len = len(sql) - buf = [] - for tb_name, values in self._tb_values.items(): - q = tb_name + " VALUES " + values - if sql_len + len(q) >= self._max_sql_lenght: - sql += " ".join(buf) - self.execute_sql(sql) - sql = "INSERT INTO " - sql_len = len(sql) - buf = [] - buf.append(q) - sql_len += len(q) - sql += " ".join(buf) - self.execute_sql(sql) - self._tb_values.clear() - self._buffered_count = 0 - - def execute_sql(self, sql): - import taos - try: - self._conn.execute(sql) - except taos.Error as e: - error_code = e.errno & 0xffff - # Table does not exit - if error_code == 0x362 or error_code == 0x218: - self.create_tables() - else: - raise e - - def create_tables(self): - sql = "CREATE TABLE " - for tb in self._tb_values.keys(): - tag_values = self._tb_tags[tb] - sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " - self._conn.execute(sql) - - @property - def buffered_count(self): - return self._buffered_count - - -# ANCHOR_END: SQLWriter - # ANCHOR: read def run_read_task(task_id: int, task_queues: List[Queue]): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT @@ -204,8 +113,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]): # ANCHOR: write def run_write_task(task_id: int, queue: Queue): + from sql_writer import SQLWriter log = logging.getLogger(f"WriteTask-{task_id}") - writer = SQLWriter() + writer = SQLWriter(MAX_BATCH_SIZE) try: while True: try: @@ -230,16 +140,16 @@ def set_global_config(): argc = len(sys.argv) if argc > 1: global READ_TASK_COUNT - READ_TASK_COUNT = sys.argv[1] + READ_TASK_COUNT = int(sys.argv[1]) if argc > 2: global WRITE_TASK_COUNT - WRITE_TASK_COUNT = sys.argv[2] + WRITE_TASK_COUNT = int(sys.argv[2]) if argc > 3: global TABLE_COUNT - TABLE_COUNT = sys.argv[3] + TABLE_COUNT = int(sys.argv[3]) if argc > 4: global MAX_BATCH_SIZE - MAX_BATCH_SIZE = sys.argv[4] + MAX_BATCH_SIZE = int(sys.argv[4]) # ANCHOR: main @@ -248,7 +158,7 @@ def main(): logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") database_monitor = DataBaseMonitor() - time.sleep(3) # wait for database ready + time.sleep(3) # wait for database ready task_queues: List[Queue] = [] @@ -274,7 +184,6 @@ def main(): [p.terminate() for p in write_processes] -# ANCHOR_END: main - if __name__ == '__main__': main() +# ANCHOR_END: main diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py new file mode 100644 index 0000000000..30f884fd6b --- /dev/null +++ b/docs/examples/python/sql_writer.py @@ -0,0 +1,93 @@ +import logging +import taos + + +class SQLWriter: + log = logging.getLogger("SQLWriter") + + def __init__(self, max_batch_size): + self._buffered_count = 0 + self._max_batch_size = max_batch_size + self._tb_values = {} + self._tb_tags = {} + self._conn = self.get_connection() + self._max_sql_lenght = self.get_max_sql_length() + self._conn.execute("USE test") + + def get_max_sql_length(self): + rows = self._conn.query("SHOW variables").fetch_all() + for r in rows: + name = r[0] + if name == "maxSQLLength": + return int(r[1]) + + def get_connection(self): + return taos.connect(host="localhost", user="root", password="taosdata", port=6030) + + def process_line(self, line: str): + """ + :param line: tbName,ts,current,voltage,phase,location,groupId + """ + self._buffered_count += 1 + ps = line.split(",") + table_name = ps[0] + value = '(' + ",".join(ps[1:-2]) + ') ' + if table_name in self._tb_values: + self._tb_values[table_name] += value + else: + self._tb_values[table_name] = value + + if table_name not in self._tb_tags: + location = ps[-2] + group_id = ps[-1] + tag_value = f"('{location}',{group_id})" + self._tb_tags[table_name] = tag_value + + if self._buffered_count == self._max_batch_size: + self.flush() + + def flush(self): + """ + Assemble INSERT statement and execute it. + When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created. + In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed. + """ + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + for tb_name, values in self._tb_values.items(): + q = tb_name + " VALUES " + values + if sql_len + len(q) >= self._max_sql_lenght: + sql += " ".join(buf) + self.execute_sql(sql) + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + buf.append(q) + sql_len += len(q) + sql += " ".join(buf) + self.execute_sql(sql) + self._tb_values.clear() + self._buffered_count = 0 + + def execute_sql(self, sql): + try: + self._conn.execute(sql) + except taos.Error as e: + error_code = e.errno & 0xffff + # Table does not exit + if error_code == 0x362 or error_code == 0x218: + self.create_tables() + else: + raise e + + def create_tables(self): + sql = "CREATE TABLE " + for tb in self._tb_values.keys(): + tag_values = self._tb_tags[tb] + sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " + self._conn.execute(sql) + + @property + def buffered_count(self): + return self._buffered_count -- GitLab