提交 0d235ec9 编写于 作者: D dingbo

docs: highvolume_faster_queue.py

上级 fec91dc2
......@@ -154,10 +154,14 @@ public class SQLWriter {
if (errorCode == 0x362 || errorCode == 0x218) {
// Table does not exist
createTables();
stmt.executeUpdate(sql);
executeSQL(sql);
} else {
logger.error("Error execute SQL: {}", sql);
throw e;
}
} catch (Throwable throwable) {
logger.error("Error execute SQL: {}", sql);
throw throwable;
}
}
......@@ -174,7 +178,12 @@ public class SQLWriter {
sb.append("IF NOT EXISTS ").append(tbName).append(" USING meters TAGS ").append(tagValues).append(" ");
}
String sql = sb.toString();
try {
stmt.executeUpdate(sql);
} catch (Throwable throwable) {
logger.error("Error execute SQL: {}", sql);
throw throwable;
}
}
public boolean hasBufferedValues() {
......
......@@ -4,6 +4,7 @@
#
import logging
import math
import sys
import time
import os
......@@ -42,6 +43,7 @@ def get_connection():
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
......@@ -149,7 +151,8 @@ def main():
# create read processes
for i in range(READ_TASK_COUNT):
p = Process(target=run_read_task, args=(i, task_queues))
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
......@@ -163,6 +166,16 @@ def main():
[q.close() for q in task_queues]
def assign_queues(read_task_id, task_queues):
"""
Compute target queues for a specific read task.
"""
ratio = WRITE_TASK_COUNT / READ_TASK_COUNT
from_index = math.floor(read_task_id * ratio)
end_index = math.ceil((read_task_id + 1) * ratio)
return task_queues[from_index:end_index]
if __name__ == '__main__':
main()
# ANCHOR_END: main
......@@ -47,25 +47,3 @@ class MockDataSource:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
if __name__ == '__main__':
"""
Test performance of MockDataSource
"""
from threading import Thread
count = 0
def consume():
global count
for data in MockDataSource("1", 100):
for _, rows in data:
count += len(rows)
Thread(target=consume).start()
while True:
time.sleep(1)
print(count)
---
title: 高效写入
---
import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
# 高效写入
## 高效写入原理 {#principle}
本节介绍如何高效地向 TDengine 写入数据。高效写入数据要考虑几个因素:数据在不同表(或子表)之间的分布,即要写入数据的相邻性;单次写入的数据量;并发连接数。一般来说,每批次只向同一张表(或子表)写入数据比向多张表(或子表)写入数据要更高效;每批次写入的数据量越大越高效(但超过一定阈值其优势会消失;同时写入数据的并发连接数越多写入越高效(但超过一定阈值反而会下降,取决于服务端处理能力)。
为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。
为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。
## 高效写入方案 {#scenario}
## 场景设计 {#scenario}
下面的示例程序展示了如何高效写入数据:
......@@ -25,6 +27,29 @@ title: 高效写入
上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。
:::
## 示例代码
这一部分是针对以上场景的示例代码。建议先阅读此场景的示例代码,对于其它场景高效写入原理相同,不够代码需要适当修改。
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
</TabItem>
<TabItem label="Python" value="python">
</TabItem>
</Tabs>
## 其它场景
由于写入场景众多,无法一一列举,这一部分描述对于其它常用场景修改示例代码的方法。
## Java 示例程序 {#java-demo}
在 Java 示例程序中采用拼接 SQL 的写入方式。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册