diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py new file mode 100644 index 0000000000000000000000000000000000000000..14aebc67eee5a0701081f2f5da605184568c3a89 --- /dev/null +++ b/docs/examples/python/highvolume_faster_queue.py @@ -0,0 +1,205 @@ +# install dependencies: +# recommend python >= 3.8 +# pip3 install faster-fifo +# + +import logging +import sys +import time +import os +from multiprocessing import Process +from faster_fifo import Queue +from queue import Empty +from typing import List + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") + +READ_TASK_COUNT = 1 +WRITE_TASK_COUNT = 1 +TABLE_COUNT = 1000 +QUEUE_SIZE = 1000000 +MAX_BATCH_SIZE = 3000 + +read_processes = [] +write_processes = [] + + +def get_connection(): + """ + If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used. + You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD + """ + import taos + firstEP = os.environ.get("TDENGINE_FIRST_EP") + if firstEP: + host, port = firstEP.split(":") + else: + host, port = None, 0 + user = os.environ.get("TDENGINE_USER", "root") + password = os.environ.get("TDENGINE_PASSWORD", "taosdata") + return taos.connect(host=host, port=int(port), user=user, password=password) + + +# ANCHOR: MockDataSource +class MockDataSource: + location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] + current = [8.8, 10.7, 9.9, 8.9, 9.4] + voltage = [119, 116, 111, 113, 118] + phase = [0.32, 0.34, 0.33, 0.329, 0.141] + max_rows_per_table = 10 ** 9 + + def __init__(self, tb_name_prefix, table_count): + self.table_name_prefix = tb_name_prefix + self.table_count = table_count + self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100 + + def __iter__(self): + self.row = 0 + self.table_id = -1 + return self + + def __next__(self): + """ + next 100 rows of current table + """ + self.table_id += 1 + if self.table_id == self.table_count: + self.table_id = 0 + if self.row >= self.max_rows_per_table: + raise StopIteration + rows = [] + + while len(rows) < 100: + self.row += 1 + ts = self.start_ms + 100 * self.row + group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 + tb_name = self.table_name_prefix + '_' + str(self.table_id) + ri = self.row % 5 + rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}") + return self.table_id, rows + + +# ANCHOR_END: MockDataSource + +# ANCHOR: read +def run_read_task(task_id: int, task_queues: List[Queue]): + table_count_per_task = TABLE_COUNT // READ_TASK_COUNT + data_source = MockDataSource(f"tb{task_id}", table_count_per_task) + try: + for table_id, rows in data_source: + # hash data to different queue + i = table_id % len(task_queues) + # block putting forever when the queue is full + task_queues[i].put_many(rows, block=True, timeout=-1) + except KeyboardInterrupt: + pass + + +# ANCHOR_END: read + +# ANCHOR: write +def run_write_task(task_id: int, queue: Queue): + from sql_writer import SQLWriter + log = logging.getLogger(f"WriteTask-{task_id}") + writer = SQLWriter(get_connection) + lines = None + try: + while True: + try: + # get as many as possible + lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE) + writer.process_lines(lines) + except Empty: + time.sleep(0.01) + except KeyboardInterrupt: + pass + except BaseException as e: + log.debug(f"lines={lines}") + raise e + + +# ANCHOR_END: write + +def set_global_config(): + argc = len(sys.argv) + if argc > 1: + global READ_TASK_COUNT + READ_TASK_COUNT = int(sys.argv[1]) + if argc > 2: + global WRITE_TASK_COUNT + WRITE_TASK_COUNT = int(sys.argv[2]) + if argc > 3: + global TABLE_COUNT + TABLE_COUNT = int(sys.argv[3]) + if argc > 4: + global QUEUE_SIZE + QUEUE_SIZE = int(sys.argv[4]) + if argc > 5: + global MAX_BATCH_SIZE + MAX_BATCH_SIZE = int(sys.argv[5]) + + +# ANCHOR: monitor +def run_monitor_process(): + import taos + log = logging.getLogger("DataBaseMonitor") + conn = get_connection() + conn.execute("DROP DATABASE IF EXISTS test") + conn.execute("CREATE DATABASE test") + conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) " + "TAGS (location BINARY(64), groupId INT)") + + def get_count(): + res = conn.query("SELECT count(*) FROM test.meters") + rows = res.fetch_all() + return rows[0][0] if rows else 0 + + last_count = 0 + while True: + time.sleep(10) + count = get_count() + log.info(f"count={count} speed={(count - last_count) / 10}") + last_count = count + + +# ANCHOR_END: monitor +# ANCHOR: main +def main(): + set_global_config() + logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, " + f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") + + monitor_process = Process(target=run_monitor_process) + monitor_process.start() + time.sleep(3) # waiting for database ready. + + task_queues: List[Queue] = [] + # create task queues + for i in range(WRITE_TASK_COUNT): + queue = Queue(max_size_bytes=QUEUE_SIZE) + task_queues.append(queue) + # create write processes + for i in range(WRITE_TASK_COUNT): + p = Process(target=run_write_task, args=(i, task_queues[i])) + p.start() + logging.debug(f"WriteTask-{i} started with pid {p.pid}") + write_processes.append(p) + # create read processes + for i in range(READ_TASK_COUNT): + p = Process(target=run_read_task, args=(i, task_queues)) + p.start() + logging.debug(f"ReadTask-{i} started with pid {p.pid}") + read_processes.append(p) + + try: + monitor_process.join() + except KeyboardInterrupt: + monitor_process.terminate() + [p.terminate() for p in read_processes] + [p.terminate() for p in write_processes] + [q.close() for q in task_queues] + + +if __name__ == '__main__': + main() +# ANCHOR_END: main diff --git a/docs/examples/python/highvolume_mp_queue.py b/docs/examples/python/highvolume_mp_queue.py new file mode 100644 index 0000000000000000000000000000000000000000..f6605adb0f5e6811a652a1fc0a6ef14f6c537013 --- /dev/null +++ b/docs/examples/python/highvolume_mp_queue.py @@ -0,0 +1,193 @@ +# import logging +# import sys +# import time +# from multiprocessing import Queue, Process +# from queue import Empty +# from typing import List +# +# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") +# +# READ_TASK_COUNT = 1 +# WRITE_TASK_COUNT = 1 +# QUEUE_SIZE = 1000 +# TABLE_COUNT = 1000 +# MAX_BATCH_SIZE = 3000 +# +# read_processes = [] +# write_processes = [] +# +# +# # ANCHOR: DataBaseMonitor +# class DataBaseMonitor: +# """ +# Start a thread. +# Prepare database and stable. +# Statistic writing speed and print it every 10 seconds. +# """ +# +# def __init__(self): +# self.process = Process(target=self.run) +# self.process.start() +# +# def get_connection(self): +# import taos +# return taos.connect(host="localhost", user="root", password="taosdata", port=6030) +# +# def prepare_database(self, conn): +# conn.execute("DROP DATABASE IF EXISTS test") +# conn.execute("CREATE DATABASE test") +# conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") +# +# def get_count(self, conn): +# res = conn.query("SELECT count(*) FROM test.meters") +# rows = res.fetch_all() +# return rows[0][0] if rows else 0 +# +# def run(self): +# log = logging.getLogger("DataBaseMonitor") +# conn = self.get_connection() +# self.prepare_database(conn) +# last_count = 0 +# while True: +# time.sleep(10) +# count = self.get_count(conn) +# log.info(f"count={count} speed={(count - last_count) / 10}") +# last_count = count +# +# def join(self): +# self.process.join() +# +# def stop(self): +# self.process.terminate() +# +# +# # ANCHOR_END: DataBaseMonitor +# +# # ANCHOR: MockDataSource +# class MockDataSource: +# location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] +# current = [8.8, 10.7, 9.9, 8.9, 9.4] +# voltage = [119, 116, 111, 113, 118] +# phase = [0.32, 0.34, 0.33, 0.329, 0.141] +# max_rows_per_table = 10 ** 9 +# +# def __init__(self, tb_name_prefix, table_count): +# self.table_name_prefix = tb_name_prefix +# self.table_count = table_count +# self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100 +# +# def __iter__(self): +# self.row = 0 +# self.table_id = -1 +# return self +# +# def __next__(self): +# self.table_id += 1 +# if self.table_id == self.table_count: +# self.table_id = 0 +# self.row += 1 +# if self.row < self.max_rows_per_table: +# ts = self.start_ms + 100 * self.row +# group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 +# tb_name = self.table_name_prefix + '_' + str(self.table_id) +# ri = self.row % 5 +# return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}" +# else: +# raise StopIteration +# +# +# # ANCHOR_END: MockDataSource +# +# # ANCHOR: read +# def run_read_task(task_id: int, task_queues: List[Queue]): +# table_count_per_task = TABLE_COUNT // READ_TASK_COUNT +# data_source = MockDataSource(f"tb{task_id}", table_count_per_task) +# try: +# for table_id, line in data_source: +# i = table_id % len(task_queues) +# task_queues[i].put(line, block=True) +# except KeyboardInterrupt: +# pass +# +# +# # ANCHOR_END: read +# +# # ANCHOR: write +# def run_write_task(task_id: int, queue: Queue): +# from sql_writer import SQLWriter +# log = logging.getLogger(f"WriteTask-{task_id}") +# writer = SQLWriter(MAX_BATCH_SIZE) +# try: +# while True: +# try: +# line = queue.get(block=False) +# writer.process_line(line) +# except Empty: +# if writer.buffered_count > 0: +# writer.flush() +# else: +# time.sleep(0.01) +# except KeyboardInterrupt: +# pass +# except BaseException as e: +# msg = f"line={line}, buffer_count={writer.buffered_count}" +# log.debug(msg) +# raise e +# +# +# # ANCHOR_END: write +# +# def set_global_config(): +# argc = len(sys.argv) +# if argc > 1: +# global READ_TASK_COUNT +# READ_TASK_COUNT = int(sys.argv[1]) +# if argc > 2: +# global WRITE_TASK_COUNT +# WRITE_TASK_COUNT = int(sys.argv[2]) +# if argc > 3: +# global QUEUE_SIZE +# QUEUE_SIZE = int(sys.argv[3]) +# if argc > 4: +# global TABLE_COUNT +# TABLE_COUNT = int(sys.argv[4]) +# if argc > 5: +# global MAX_BATCH_SIZE +# MAX_BATCH_SIZE = int(sys.argv[5]) +# +# +# # ANCHOR: main +# def main(): +# set_global_config() +# logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") +# +# database_monitor = DataBaseMonitor() +# time.sleep(3) # wait for database ready +# +# task_queues: List[Queue] = [] +# +# for i in range(WRITE_TASK_COUNT): +# queue = Queue(maxsize=QUEUE_SIZE) +# task_queues.append(queue) +# p = Process(target=run_write_task, args=(i, queue)) +# p.start() +# logging.debug(f"WriteTask-{i} started with pid {p.pid}") +# write_processes.append(p) +# +# for i in range(READ_TASK_COUNT): +# p = Process(target=run_read_task, args=(i, task_queues)) +# p.start() +# logging.debug(f"ReadTask-{i} started with pid {p.pid}") +# read_processes.append(p) +# +# try: +# database_monitor.join() +# except KeyboardInterrupt: +# database_monitor.stop() +# [p.terminate() for p in read_processes] +# [p.terminate() for p in write_processes] +# +# +# if __name__ == '__main__': +# main() +# # ANCHOR_END: main diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..ad5653f0ece10a482036b3d41880984ef4b38e61 --- /dev/null +++ b/docs/examples/python/sql_writer.py @@ -0,0 +1,81 @@ +import logging +import taos + + +class SQLWriter: + log = logging.getLogger("SQLWriter") + + def __init__(self, get_connection_func): + self._tb_values = {} + self._tb_tags = {} + self._conn = get_connection_func() + self._max_sql_length = self.get_max_sql_length() + self._conn.execute("USE test") + + def get_max_sql_length(self): + rows = self._conn.query("SHOW variables").fetch_all() + for r in rows: + name = r[0] + if name == "maxSQLLength": + return int(r[1]) + + def process_lines(self, lines: str): + """ + :param lines: [[tbName,ts,current,voltage,phase,location,groupId]] + """ + for line in lines: + ps = line.split(",") + table_name = ps[0] + value = '(' + ",".join(ps[1:-2]) + ') ' + if table_name in self._tb_values: + self._tb_values[table_name] += value + else: + self._tb_values[table_name] = value + + if table_name not in self._tb_tags: + location = ps[-2] + group_id = ps[-1] + tag_value = f"('{location}',{group_id})" + self._tb_tags[table_name] = tag_value + self.flush() + + def flush(self): + """ + Assemble INSERT statement and execute it. + When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created. + In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed. + """ + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + for tb_name, values in self._tb_values.items(): + q = tb_name + " VALUES " + values + if sql_len + len(q) >= self._max_sql_length: + sql += " ".join(buf) + self.execute_sql(sql) + sql = "INSERT INTO " + sql_len = len(sql) + buf = [] + buf.append(q) + sql_len += len(q) + sql += " ".join(buf) + self.execute_sql(sql) + self._tb_values.clear() + + def execute_sql(self, sql): + try: + self._conn.execute(sql) + except taos.Error as e: + error_code = e.errno & 0xffff + # Table does not exit + if error_code == 0x362 or error_code == 0x218: + self.create_tables() + else: + raise e + + def create_tables(self): + sql = "CREATE TABLE " + for tb in self._tb_values.keys(): + tag_values = self._tb_tags[tb] + sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " " + self._conn.execute(sql) diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md index aafe9aec3a6a92de4fdc4cd199e4ea81f9fde5b5..c490b9f7639df63ef5b8b5172824642e57a9d8a0 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md @@ -8,6 +8,8 @@ title: 高效写入 为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 +为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。 + ## 高效写入方案 下面的示例程序展示了如何高效写入数据: @@ -19,6 +21,10 @@ title: 高效写入 ![TDengine 高效写入线程模型](highvolume.webp) +:::note +上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。 +::: + ## Java 示例程序 在 Java 示例程序中采用拼接 SQL 的写入方式。 @@ -37,12 +43,18 @@ title: 高效写入 1. 读线程个数。默认为 1。 2. 写线程个数。默认为 3。 3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。 -4. 每批最大数据量。默认为 3000。 +4. 每批最多写入记录数量。默认为 3000。 + +
+主程序 -```java title="主程序" +```java {{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}} ``` +
+ + 队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。 ### 读任务的实现 @@ -86,13 +98,17 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都 ### 执行示例程序 +
+执行 Java 示例程序 + + 执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置: ``` TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" ``` -若要在本地集成开发环境执行示例程序,只需: +#### 本地集成开发环境执行示例程序 1. clone TDengine 仓库 ``` @@ -102,6 +118,8 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" 3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。 4. 运行类 `com.taos.example.highvolume.FastWriteExample`。 +#### 远程服务器上执行示例程序 + 若要在服务器上执行示例程序,可按照下面的步骤操作: 1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行: @@ -130,14 +148,16 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" 以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。 5. 用 java 命令启动示例程序,命令模板: + ``` java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample ``` + 6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。 下面是一次实际运行的截图: ``` - [bding@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000 + [testuser@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000 17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000 17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started 17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started @@ -161,10 +181,162 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata" 17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000 ``` +
+ ## Python 示例程序 -在 Python 示例程序中采用参数绑定的写入方式。(开发中) +该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。 +### main 函数 + +main 函数负责创建消息队列和启动子进程,子进程有 3 类: + +1. 1 个监控进程,负责数据库初始化和统计写入速度 +2. n 个读进程,负责从其它数据系统读数据 +3. m 个写进程,负责写数据库 + +main 函数可以接收 5 个启动参数,依次是: + +1. 读任务(进程)数 +2. 写任务(进程)数 +3. 总表数据 +4. 队列大小(单位字节) +5. 每批最多写入记录数量 + +
+ +main 函数 - +
\ No newline at end of file