diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py
index b45cc0918ff3ca0b3d67898f638ae8c7f9927a82..14aebc67eee5a0701081f2f5da605184568c3a89 100644
--- a/docs/examples/python/highvolume_faster_queue.py
+++ b/docs/examples/python/highvolume_faster_queue.py
@@ -6,6 +6,7 @@
import logging
import sys
import time
+import os
from multiprocessing import Process
from faster_fifo import Queue
from queue import Empty
@@ -23,6 +24,22 @@ read_processes = []
write_processes = []
+def get_connection():
+ """
+ If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
+ You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
+ """
+ import taos
+ firstEP = os.environ.get("TDENGINE_FIRST_EP")
+ if firstEP:
+ host, port = firstEP.split(":")
+ else:
+ host, port = None, 0
+ user = os.environ.get("TDENGINE_USER", "root")
+ password = os.environ.get("TDENGINE_PASSWORD", "taosdata")
+ return taos.connect(host=host, port=int(port), user=user, password=password)
+
+
# ANCHOR: MockDataSource
class MockDataSource:
location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
@@ -58,8 +75,7 @@ class MockDataSource:
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri = self.row % 5
- rows.append(
- f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}")
+ rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}")
return self.table_id, rows
@@ -71,7 +87,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for table_id, rows in data_source:
+ # hash data to different queue
i = table_id % len(task_queues)
+ # block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt:
pass
@@ -83,11 +101,12 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
def run_write_task(task_id: int, queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
- writer = SQLWriter()
+ writer = SQLWriter(get_connection)
lines = None
try:
while True:
try:
+ # get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
writer.process_lines(lines)
except Empty:
@@ -100,6 +119,7 @@ def run_write_task(task_id: int, queue: Queue):
# ANCHOR_END: write
+
def set_global_config():
argc = len(sys.argv)
if argc > 1:
@@ -119,11 +139,11 @@ def set_global_config():
MAX_BATCH_SIZE = int(sys.argv[5])
-# ANCHOR: main
+# ANCHOR: monitor
def run_monitor_process():
import taos
log = logging.getLogger("DataBaseMonitor")
- conn = taos.connect(host="localhost", user="root", password="taosdata", port=6030)
+ conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
@@ -142,6 +162,8 @@ def run_monitor_process():
last_count = count
+# ANCHOR_END: monitor
+# ANCHOR: main
def main():
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
@@ -149,7 +171,7 @@ def main():
monitor_process = Process(target=run_monitor_process)
monitor_process.start()
- time.sleep(3)
+ time.sleep(3) # waiting for database ready.
task_queues: List[Queue] = []
# create task queues
diff --git a/docs/examples/python/sql_writer.py b/docs/examples/python/sql_writer.py
index 5c36cf1e6f6c4902a4400e8dd7566f94c341f6cf..ad5653f0ece10a482036b3d41880984ef4b38e61 100644
--- a/docs/examples/python/sql_writer.py
+++ b/docs/examples/python/sql_writer.py
@@ -5,10 +5,10 @@ import taos
class SQLWriter:
log = logging.getLogger("SQLWriter")
- def __init__(self):
+ def __init__(self, get_connection_func):
self._tb_values = {}
self._tb_tags = {}
- self._conn = self.get_connection()
+ self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("USE test")
@@ -19,10 +19,6 @@ class SQLWriter:
if name == "maxSQLLength":
return int(r[1])
- @staticmethod
- def get_connection():
- return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
-
def process_lines(self, lines: str):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
diff --git a/docs/zh/07-develop/03-insert-data/05-high-volume.md b/docs/zh/07-develop/03-insert-data/05-high-volume.md
index 69b30eb02b3f7e9582523ed95841dbeb0179eb22..c490b9f7639df63ef5b8b5172824642e57a9d8a0 100644
--- a/docs/zh/07-develop/03-insert-data/05-high-volume.md
+++ b/docs/zh/07-develop/03-insert-data/05-high-volume.md
@@ -8,6 +8,8 @@ title: 高效写入
为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。
+为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考[配置参考](../../reference/config)性能调优部分。
+
## 高效写入方案
下面的示例程序展示了如何高效写入数据:
@@ -19,6 +21,10 @@ title: 高效写入

+:::note
+上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。
+:::
+
## Java 示例程序
在 Java 示例程序中采用拼接 SQL 的写入方式。
@@ -37,13 +43,15 @@ title: 高效写入
1. 读线程个数。默认为 1。
2. 写线程个数。默认为 3。
3. 模拟生成的总表数。默认为 1000。将会平分给各个读线程。
-4. 每批最大数据量。默认为 3000。
+4. 每批最多写入记录数量。默认为 3000。
主程序
+
```java
{{#include docs/examples/java/src/main/java/com/taos/example/highvolume/FastWriteExample.java:main}}
```
+
@@ -90,13 +98,17 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都
### 执行示例程序
+
+执行 Java 示例程序
+
+
执行程序前需配置环境变量 `TDENGINE_JDBC_URL`。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
-若要在本地集成开发环境执行示例程序,只需:
+#### 本地集成开发环境执行示例程序
1. clone TDengine 仓库
```
@@ -106,6 +118,8 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
3. 在开发环境中配置环境变量 `TDENGINE_JDBC_URL`。如果已配置了全局的环境变量 `TDENGINE_JDBC_URL` 可跳过这一步。
4. 运行类 `com.taos.example.highvolume.FastWriteExample`。
+#### 远程服务器上执行示例程序
+
若要在服务器上执行示例程序,可按照下面的步骤操作:
1. 打包示例代码。在目录 TDengine/docs/examples/java 下执行:
@@ -134,14 +148,16 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
5. 用 java 命令启动示例程序,命令模板:
+
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample
```
+
6. 结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按 CTRL + C 结束程序。
下面是一次实际运行的截图:
```
- [bding@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
+ [testuser@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000
17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
@@ -165,80 +181,162 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000
```
+
+
## Python 示例程序
-在 Python 示例程序中采用多进程的架构。写任务中同样采用拼装 SQL 的方式写入。
-### 主进程
+该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。
+### main 函数
+
+main 函数负责创建消息队列和启动子进程,子进程有 3 类:
+
+1. 1 个监控进程,负责数据库初始化和统计写入速度
+2. n 个读进程,负责从其它数据系统读数据
+3. m 个写进程,负责写数据库
+
+main 函数可以接收 5 个启动参数,依次是:
+
+1. 读任务(进程)数
+2. 写任务(进程)数
+3. 总表数据
+4. 队列大小(单位字节)
+5. 每批最多写入记录数量
main 函数
```python
-{{#include docs/examples/python/highvolume_example.py:main}}
+{{#include docs/examples/python/highvolume_faster_queue.py:main}}
```
+### 监控进程
+
+监控进程负责初始化数据库,并监控当前的写入速度。
+
-DataBaseMonitor
+Monitor Process
```python
-{{#include docs/examples/python/highvolume_example.py:DataBaseMonitor}}
+{{#include docs/examples/python/highvolume_faster_queue.py:monitor}}
```
### 读进程
+#### 读进程主要逻辑
+
+读进程,负责从其它数据系统读数据,并分发数据到各个写进程。
+
-启动函数
+run_read_task 函数
```python
-{{#include docs/examples/python/highvolume_example.py:read}}
+{{#include docs/examples/python/highvolume_faster_queue.py:read}}
```
-
+#### 模拟数据源
+
+以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
+
MockDataSource
```python
-{{#include docs/examples/python/highvolume_example.py:MockDataSource}}
+{{#include docs/examples/python/highvolume_faster_queue.py:MockDataSource}}
```
### 写进程
+写进程每次从队列中取出尽量多的数据,并批量写入。
-
-启动函数
+run_write_task 函数
```python
-{{#include docs/examples/python/highvolume_example.py:write}}
+{{#include docs/examples/python/highvolume_faster_queue.py:write}}
```
-
+### SQLWriter 类的实现
+
+SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。
+
SQLWriter
```python
-{{#include docs/examples/python/highvolume_example.py:SQLWriter}}
+{{#include docs/examples/python/sql_writer.py}}
```
### 执行示例程序
+
+
+执行 Python 示例程序
+
+1. 前提条件
+ - 已安装 TDengine 客户端驱动
+ - 已安装 Python3, 推荐版本 >= 3.8
+ - 已安装 taospy
+
+2. 安装 faster-fifo 代替 python 内置的 multiprocessing.Queue
+ ```
+ pip3 install faster-fifo
+ ```
+
+3. 点击上面的“查看源码”链接复制 `highvolume_faster_queue.py` 和 `sql_writer.py` 两个文件。
+
+4. 执行示例程序
+
+ ```
+ python3 highvolume_faster_queue.py
+ ```
+
+下面是一次实际运行的输出:
+
```
-git clone git@github.com:taosdata/TDengine.git
-git checkout -t 2.6
-cd docs/examples/python/
-python3 highvolume_example.py
-```
\ No newline at end of file
+[testuser@vm95 python]$ python3 highvolume_faster_queue.py 10 10 1000 1000000 3000
+2022-07-12 21:53:07,147 [root] - READ_TASK_COUNT=10, WRITE_TASK_COUNT=10, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
+2022-07-12 21:53:10,168 [root] - WriteTask-0 started with pid 6228
+2022-07-12 21:53:10,168 [root] - WriteTask-1 started with pid 6229
+2022-07-12 21:53:10,169 [root] - WriteTask-2 started with pid 6230
+2022-07-12 21:53:10,169 [root] - WriteTask-3 started with pid 6231
+2022-07-12 21:53:10,170 [root] - WriteTask-4 started with pid 6232
+2022-07-12 21:53:10,171 [root] - WriteTask-5 started with pid 6233
+2022-07-12 21:53:10,171 [root] - WriteTask-6 started with pid 6234
+2022-07-12 21:53:10,172 [root] - WriteTask-7 started with pid 6235
+2022-07-12 21:53:10,172 [root] - WriteTask-8 started with pid 6236
+2022-07-12 21:53:10,173 [root] - WriteTask-9 started with pid 6237
+2022-07-12 21:53:10,174 [root] - ReadTask-0 started with pid 6238
+2022-07-12 21:53:10,175 [root] - ReadTask-1 started with pid 6239
+2022-07-12 21:53:10,176 [root] - ReadTask-2 started with pid 6240
+2022-07-12 21:53:10,177 [root] - ReadTask-3 started with pid 6241
+2022-07-12 21:53:10,178 [root] - ReadTask-4 started with pid 6242
+2022-07-12 21:53:10,179 [root] - ReadTask-5 started with pid 6243
+2022-07-12 21:53:10,180 [root] - ReadTask-6 started with pid 6244
+2022-07-12 21:53:10,181 [root] - ReadTask-7 started with pid 6245
+2022-07-12 21:53:10,181 [root] - ReadTask-8 started with pid 6246
+2022-07-12 21:53:10,182 [root] - ReadTask-9 started with pid 6247
+2022-07-12 21:53:17,375 [DataBaseMonitor] - count=3333857 speed=333385.7
+2022-07-12 21:53:27,564 [DataBaseMonitor] - count=8883905 speed=555004.8
+2022-07-12 21:53:37,742 [DataBaseMonitor] - count=14233135 speed=534923.0
+2022-07-12 21:53:47,926 [DataBaseMonitor] - count=19759409 speed=552627.4
+2022-07-12 21:53:58,275 [DataBaseMonitor] - count=25245406 speed=548599.7
+2022-07-12 21:54:08,478 [DataBaseMonitor] - count=30644263 speed=539885.7
+2022-07-12 21:54:18,772 [DataBaseMonitor] - count=36110956 speed=546669.3
+2022-07-12 21:54:29,031 [DataBaseMonitor] - count=41456746 speed=534579.0
+```
+
+
\ No newline at end of file