diff --git a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java index b13d6c363c7f3b79168f87f7db1604652382d332..772913bb846651c6cbd2953a7e3dcc286e805378 100644 --- a/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java +++ b/docs/examples/java/src/main/java/com/taos/example/highvolume/SQLWriter.java @@ -154,10 +154,14 @@ public class SQLWriter { if (errorCode == 0x362 || errorCode == 0x218) { // Table does not exist createTables(); - stmt.executeUpdate(sql); + executeSQL(sql); } else { + logger.error("Error execute SQL: {}", sql); throw e; } + } catch (Throwable throwable) { + logger.error("Error execute SQL: {}", sql); + throw throwable; } } @@ -174,7 +178,12 @@ public class SQLWriter { sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" "); } String sql = sb.toString(); - stmt.executeUpdate(sql); + try { + stmt.executeUpdate(sql); + } catch (Throwable throwable) { + logger.error("Error execute SQL: {}", sql); + throw throwable; + } } public boolean hasBufferedValues() { diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py index a08de041774894e28c6ccd878c2aceee2b2dbe51..29683c8001d3ac002e952c5afe67fa8a1d727cf0 100644 --- a/docs/examples/python/highvolume_faster_queue.py +++ b/docs/examples/python/highvolume_faster_queue.py @@ -4,6 +4,7 @@ # import logging +import math import sys import time import os @@ -42,6 +43,7 @@ def get_connection(): # ANCHOR: read + def run_read_task(task_id: int, task_queues: List[Queue]): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT data_source = MockDataSource(f"tb{task_id}", table_count_per_task) @@ -149,7 +151,8 @@ def main(): # create read processes for i in range(READ_TASK_COUNT): - p = Process(target=run_read_task, args=(i, task_queues)) + queues = assign_queues(i, task_queues) + p = Process(target=run_read_task, args=(i, queues)) p.start() logging.debug(f"ReadTask-{i} started with pid {p.pid}") read_processes.append(p) @@ -163,6 +166,16 @@ def main(): [q.close() for q in task_queues] +def assign_queues(read_task_id, task_queues): + """ + Compute target queues for a specific read task. + """ + ratio = WRITE_TASK_COUNT / READ_TASK_COUNT + from_index = math.floor(read_task_id * ratio) + end_index = math.ceil((read_task_id + 1) * ratio) + return task_queues[from_index:end_index] + + if __name__ == '__main__': main() # ANCHOR_END: main diff --git a/docs/examples/python/mockdatasoruce.py b/docs/examples/python/mockdatasoruce.py index f6c54a8f96322f4495c91de54ce6bc2face0b5dc..852860aec0adc8f9b043c9dcd5deb0bf00239201 100644 --- a/docs/examples/python/mockdatasoruce.py +++ b/docs/examples/python/mockdatasoruce.py @@ -47,25 +47,3 @@ class MockDataSource: rows = [table_name + ',' + t + ',' + values for t in ts] result.append((table_id, rows)) return result - - -if __name__ == '__main__': - """ - Test performance of MockDataSource - """ - from threading import Thread - - count = 0 - - - def consume(): - global count - for data in MockDataSource("1", 100): - for _, rows in data: - count += len(rows) - - - Thread(target=consume).start() - while True: - time.sleep(1) - print(count) diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.mdx similarity index 96% rename from docs/zh/07-develop/03-insert-data/05-high-volume.md rename to docs/zh/07-develop/03-insert-data/05-high-volume.mdx index 731cf2c188398b89ca5661e4cb3c4328b1173494..23ec979a028ac80add9b2cf07eeb2d105b204f4e 100644 --- a/docs/zh/07-develop/03-insert-data/05-high-volume.md +++ b/docs/zh/07-develop/03-insert-data/05-high-volume.mdx @@ -1,16 +1,18 @@ ---- -title: 高效写入 ---- +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; + +# 高效写入 ## 高效写入原理 {#principle} + 本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。 为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。 为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。 -## 高效写入方案 {#scenario} +## 场景设计 {#scenario} 下面的示例程序展示了如何高效写入数据: @@ -25,6 +27,29 @@ title: 高效写入 上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。 ::: +## 示例代码 + +这一部分是针对以上场景的示例代码。建议先阅读此场景的示例代码,对于其它场景高效写入原理相同,不够代码需要适当修改。 + + + + + + + + + + + + + + + +## 其它场景 + +由于写入场景众多,无法一一列举,这一部分描述对于其它常用场景修改示例代码的方法。 + + ## Java 示例程序 {#java-demo} 在 Java 示例程序中采用拼接 SQL 的写入方式。