Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2300710d
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
2300710d
编写于
7月 13, 2022
作者:
W
wade zhang
提交者:
GitHub
7月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14837 from taosdata/docs/python-example
docs: python high-volume example
上级
19fc2920
8efd7d2e
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
658 addition
and
7 deletion
+658
-7
docs/examples/python/highvolume_faster_queue.py
docs/examples/python/highvolume_faster_queue.py
+205
-0
docs/examples/python/highvolume_mp_queue.py
docs/examples/python/highvolume_mp_queue.py
+193
-0
docs/examples/python/sql_writer.py
docs/examples/python/sql_writer.py
+81
-0
docs/zh/07-develop/03-insert-data/05-high-volume.md
docs/zh/07-develop/03-insert-data/05-high-volume.md
+179
-7
未找到文件。
docs/examples/python/highvolume_faster_queue.py
0 → 100644
浏览文件 @
2300710d
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import
logging
import
sys
import
time
import
os
from
multiprocessing
import
Process
from
faster_fifo
import
Queue
from
queue
import
Empty
from
typing
import
List
logging
.
basicConfig
(
stream
=
sys
.
stdout
,
level
=
logging
.
DEBUG
,
format
=
"%(asctime)s [%(name)s] - %(message)s"
)
READ_TASK_COUNT
=
1
WRITE_TASK_COUNT
=
1
TABLE_COUNT
=
1000
QUEUE_SIZE
=
1000000
MAX_BATCH_SIZE
=
3000
read_processes
=
[]
write_processes
=
[]
def
get_connection
():
"""
If variable TDENGINE_FIRST_EP is provided then it will be used. If not, firstEP in /etc/taos/taos.cfg will be used.
You can also override the default username and password by supply variable TDENGINE_USER and TDENGINE_PASSWORD
"""
import
taos
firstEP
=
os
.
environ
.
get
(
"TDENGINE_FIRST_EP"
)
if
firstEP
:
host
,
port
=
firstEP
.
split
(
":"
)
else
:
host
,
port
=
None
,
0
user
=
os
.
environ
.
get
(
"TDENGINE_USER"
,
"root"
)
password
=
os
.
environ
.
get
(
"TDENGINE_PASSWORD"
,
"taosdata"
)
return
taos
.
connect
(
host
=
host
,
port
=
int
(
port
),
user
=
user
,
password
=
password
)
# ANCHOR: MockDataSource
class
MockDataSource
:
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
current
=
[
8.8
,
10.7
,
9.9
,
8.9
,
9.4
]
voltage
=
[
119
,
116
,
111
,
113
,
118
]
phase
=
[
0.32
,
0.34
,
0.33
,
0.329
,
0.141
]
max_rows_per_table
=
10
**
9
def
__init__
(
self
,
tb_name_prefix
,
table_count
):
self
.
table_name_prefix
=
tb_name_prefix
self
.
table_count
=
table_count
self
.
start_ms
=
round
(
time
.
time
()
*
1000
)
-
self
.
max_rows_per_table
*
100
def
__iter__
(
self
):
self
.
row
=
0
self
.
table_id
=
-
1
return
self
def
__next__
(
self
):
"""
next 100 rows of current table
"""
self
.
table_id
+=
1
if
self
.
table_id
==
self
.
table_count
:
self
.
table_id
=
0
if
self
.
row
>=
self
.
max_rows_per_table
:
raise
StopIteration
rows
=
[]
while
len
(
rows
)
<
100
:
self
.
row
+=
1
ts
=
self
.
start_ms
+
100
*
self
.
row
group_id
=
self
.
table_id
%
5
if
self
.
table_id
%
5
==
0
else
self
.
table_id
%
5
+
1
tb_name
=
self
.
table_name_prefix
+
'_'
+
str
(
self
.
table_id
)
ri
=
self
.
row
%
5
rows
.
append
(
f
"
{
tb_name
}
,
{
ts
}
,
{
self
.
current
[
ri
]
}
,
{
self
.
voltage
[
ri
]
}
,
{
self
.
phase
[
ri
]
}
,
{
self
.
location
[
ri
]
}
,
{
group_id
}
"
)
return
self
.
table_id
,
rows
# ANCHOR_END: MockDataSource
# ANCHOR: read
def
run_read_task
(
task_id
:
int
,
task_queues
:
List
[
Queue
]):
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
try
:
for
table_id
,
rows
in
data_source
:
# hash data to different queue
i
=
table_id
%
len
(
task_queues
)
# block putting forever when the queue is full
task_queues
[
i
].
put_many
(
rows
,
block
=
True
,
timeout
=-
1
)
except
KeyboardInterrupt
:
pass
# ANCHOR_END: read
# ANCHOR: write
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
from
sql_writer
import
SQLWriter
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
writer
=
SQLWriter
(
get_connection
)
lines
=
None
try
:
while
True
:
try
:
# get as many as possible
lines
=
queue
.
get_many
(
block
=
False
,
max_messages_to_get
=
MAX_BATCH_SIZE
)
writer
.
process_lines
(
lines
)
except
Empty
:
time
.
sleep
(
0.01
)
except
KeyboardInterrupt
:
pass
except
BaseException
as
e
:
log
.
debug
(
f
"lines=
{
lines
}
"
)
raise
e
# ANCHOR_END: write
def
set_global_config
():
argc
=
len
(
sys
.
argv
)
if
argc
>
1
:
global
READ_TASK_COUNT
READ_TASK_COUNT
=
int
(
sys
.
argv
[
1
])
if
argc
>
2
:
global
WRITE_TASK_COUNT
WRITE_TASK_COUNT
=
int
(
sys
.
argv
[
2
])
if
argc
>
3
:
global
TABLE_COUNT
TABLE_COUNT
=
int
(
sys
.
argv
[
3
])
if
argc
>
4
:
global
QUEUE_SIZE
QUEUE_SIZE
=
int
(
sys
.
argv
[
4
])
if
argc
>
5
:
global
MAX_BATCH_SIZE
MAX_BATCH_SIZE
=
int
(
sys
.
argv
[
5
])
# ANCHOR: monitor
def
run_monitor_process
():
import
taos
log
=
logging
.
getLogger
(
"DataBaseMonitor"
)
conn
=
get_connection
()
conn
.
execute
(
"DROP DATABASE IF EXISTS test"
)
conn
.
execute
(
"CREATE DATABASE test"
)
conn
.
execute
(
"CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)"
)
def
get_count
():
res
=
conn
.
query
(
"SELECT count(*) FROM test.meters"
)
rows
=
res
.
fetch_all
()
return
rows
[
0
][
0
]
if
rows
else
0
last_count
=
0
while
True
:
time
.
sleep
(
10
)
count
=
get_count
()
log
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
last_count
=
count
# ANCHOR_END: monitor
# ANCHOR: main
def
main
():
set_global_config
()
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, "
f
"TABLE_COUNT=
{
TABLE_COUNT
}
, QUEUE_SIZE=
{
QUEUE_SIZE
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
monitor_process
=
Process
(
target
=
run_monitor_process
)
monitor_process
.
start
()
time
.
sleep
(
3
)
# waiting for database ready.
task_queues
:
List
[
Queue
]
=
[]
# create task queues
for
i
in
range
(
WRITE_TASK_COUNT
):
queue
=
Queue
(
max_size_bytes
=
QUEUE_SIZE
)
task_queues
.
append
(
queue
)
# create write processes
for
i
in
range
(
WRITE_TASK_COUNT
):
p
=
Process
(
target
=
run_write_task
,
args
=
(
i
,
task_queues
[
i
]))
p
.
start
()
logging
.
debug
(
f
"WriteTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
write_processes
.
append
(
p
)
# create read processes
for
i
in
range
(
READ_TASK_COUNT
):
p
=
Process
(
target
=
run_read_task
,
args
=
(
i
,
task_queues
))
p
.
start
()
logging
.
debug
(
f
"ReadTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
read_processes
.
append
(
p
)
try
:
monitor_process
.
join
()
except
KeyboardInterrupt
:
monitor_process
.
terminate
()
[
p
.
terminate
()
for
p
in
read_processes
]
[
p
.
terminate
()
for
p
in
write_processes
]
[
q
.
close
()
for
q
in
task_queues
]
if
__name__
==
'__main__'
:
main
()
# ANCHOR_END: main
docs/examples/python/highvolume_mp_queue.py
0 → 100644
浏览文件 @
2300710d
# import logging
# import sys
# import time
# from multiprocessing import Queue, Process
# from queue import Empty
# from typing import List
#
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
#
# READ_TASK_COUNT = 1
# WRITE_TASK_COUNT = 1
# QUEUE_SIZE = 1000
# TABLE_COUNT = 1000
# MAX_BATCH_SIZE = 3000
#
# read_processes = []
# write_processes = []
#
#
# # ANCHOR: DataBaseMonitor
# class DataBaseMonitor:
# """
# Start a thread.
# Prepare database and stable.
# Statistic writing speed and print it every 10 seconds.
# """
#
# def __init__(self):
# self.process = Process(target=self.run)
# self.process.start()
#
# def get_connection(self):
# import taos
# return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
#
# def prepare_database(self, conn):
# conn.execute("DROP DATABASE IF EXISTS test")
# conn.execute("CREATE DATABASE test")
# conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
#
# def get_count(self, conn):
# res = conn.query("SELECT count(*) FROM test.meters")
# rows = res.fetch_all()
# return rows[0][0] if rows else 0
#
# def run(self):
# log = logging.getLogger("DataBaseMonitor")
# conn = self.get_connection()
# self.prepare_database(conn)
# last_count = 0
# while True:
# time.sleep(10)
# count = self.get_count(conn)
# log.info(f"count={count} speed={(count - last_count) / 10}")
# last_count = count
#
# def join(self):
# self.process.join()
#
# def stop(self):
# self.process.terminate()
#
#
# # ANCHOR_END: DataBaseMonitor
#
# # ANCHOR: MockDataSource
# class MockDataSource:
# location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
# current = [8.8, 10.7, 9.9, 8.9, 9.4]
# voltage = [119, 116, 111, 113, 118]
# phase = [0.32, 0.34, 0.33, 0.329, 0.141]
# max_rows_per_table = 10 ** 9
#
# def __init__(self, tb_name_prefix, table_count):
# self.table_name_prefix = tb_name_prefix
# self.table_count = table_count
# self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
#
# def __iter__(self):
# self.row = 0
# self.table_id = -1
# return self
#
# def __next__(self):
# self.table_id += 1
# if self.table_id == self.table_count:
# self.table_id = 0
# self.row += 1
# if self.row < self.max_rows_per_table:
# ts = self.start_ms + 100 * self.row
# group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
# tb_name = self.table_name_prefix + '_' + str(self.table_id)
# ri = self.row % 5
# return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}"
# else:
# raise StopIteration
#
#
# # ANCHOR_END: MockDataSource
#
# # ANCHOR: read
# def run_read_task(task_id: int, task_queues: List[Queue]):
# table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
# data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
# try:
# for table_id, line in data_source:
# i = table_id % len(task_queues)
# task_queues[i].put(line, block=True)
# except KeyboardInterrupt:
# pass
#
#
# # ANCHOR_END: read
#
# # ANCHOR: write
# def run_write_task(task_id: int, queue: Queue):
# from sql_writer import SQLWriter
# log = logging.getLogger(f"WriteTask-{task_id}")
# writer = SQLWriter(MAX_BATCH_SIZE)
# try:
# while True:
# try:
# line = queue.get(block=False)
# writer.process_line(line)
# except Empty:
# if writer.buffered_count > 0:
# writer.flush()
# else:
# time.sleep(0.01)
# except KeyboardInterrupt:
# pass
# except BaseException as e:
# msg = f"line={line}, buffer_count={writer.buffered_count}"
# log.debug(msg)
# raise e
#
#
# # ANCHOR_END: write
#
# def set_global_config():
# argc = len(sys.argv)
# if argc > 1:
# global READ_TASK_COUNT
# READ_TASK_COUNT = int(sys.argv[1])
# if argc > 2:
# global WRITE_TASK_COUNT
# WRITE_TASK_COUNT = int(sys.argv[2])
# if argc > 3:
# global QUEUE_SIZE
# QUEUE_SIZE = int(sys.argv[3])
# if argc > 4:
# global TABLE_COUNT
# TABLE_COUNT = int(sys.argv[4])
# if argc > 5:
# global MAX_BATCH_SIZE
# MAX_BATCH_SIZE = int(sys.argv[5])
#
#
# # ANCHOR: main
# def main():
# set_global_config()
# logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
#
# database_monitor = DataBaseMonitor()
# time.sleep(3) # wait for database ready
#
# task_queues: List[Queue] = []
#
# for i in range(WRITE_TASK_COUNT):
# queue = Queue(maxsize=QUEUE_SIZE)
# task_queues.append(queue)
# p = Process(target=run_write_task, args=(i, queue))
# p.start()
# logging.debug(f"WriteTask-{i} started with pid {p.pid}")
# write_processes.append(p)
#
# for i in range(READ_TASK_COUNT):
# p = Process(target=run_read_task, args=(i, task_queues))
# p.start()
# logging.debug(f"ReadTask-{i} started with pid {p.pid}")
# read_processes.append(p)
#
# try:
# database_monitor.join()
# except KeyboardInterrupt:
# database_monitor.stop()
# [p.terminate() for p in read_processes]
# [p.terminate() for p in write_processes]
#
#
# if __name__ == '__main__':
# main()
# # ANCHOR_END: main
docs/examples/python/sql_writer.py
0 → 100644
浏览文件 @
2300710d
import
logging
import
taos
class
SQLWriter
:
log
=
logging
.
getLogger
(
"SQLWriter"
)
def
__init__
(
self
,
get_connection_func
):
self
.
_tb_values
=
{}
self
.
_tb_tags
=
{}
self
.
_conn
=
get_connection_func
()
self
.
_max_sql_length
=
self
.
get_max_sql_length
()
self
.
_conn
.
execute
(
"USE test"
)
def
get_max_sql_length
(
self
):
rows
=
self
.
_conn
.
query
(
"SHOW variables"
).
fetch_all
()
for
r
in
rows
:
name
=
r
[
0
]
if
name
==
"maxSQLLength"
:
return
int
(
r
[
1
])
def
process_lines
(
self
,
lines
:
str
):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
for
line
in
lines
:
ps
=
line
.
split
(
","
)
table_name
=
ps
[
0
]
value
=
'('
+
","
.
join
(
ps
[
1
:
-
2
])
+
') '
if
table_name
in
self
.
_tb_values
:
self
.
_tb_values
[
table_name
]
+=
value
else
:
self
.
_tb_values
[
table_name
]
=
value
if
table_name
not
in
self
.
_tb_tags
:
location
=
ps
[
-
2
]
group_id
=
ps
[
-
1
]
tag_value
=
f
"('
{
location
}
',
{
group_id
}
)"
self
.
_tb_tags
[
table_name
]
=
tag_value
self
.
flush
()
def
flush
(
self
):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql
=
"INSERT INTO "
sql_len
=
len
(
sql
)
buf
=
[]
for
tb_name
,
values
in
self
.
_tb_values
.
items
():
q
=
tb_name
+
" VALUES "
+
values
if
sql_len
+
len
(
q
)
>=
self
.
_max_sql_length
:
sql
+=
" "
.
join
(
buf
)
self
.
execute_sql
(
sql
)
sql
=
"INSERT INTO "
sql_len
=
len
(
sql
)
buf
=
[]
buf
.
append
(
q
)
sql_len
+=
len
(
q
)
sql
+=
" "
.
join
(
buf
)
self
.
execute_sql
(
sql
)
self
.
_tb_values
.
clear
()
def
execute_sql
(
self
,
sql
):
try
:
self
.
_conn
.
execute
(
sql
)
except
taos
.
Error
as
e
:
error_code
=
e
.
errno
&
0xffff
# Table does not exit
if
error_code
==
0x362
or
error_code
==
0x218
:
self
.
create_tables
()
else
:
raise
e
def
create_tables
(
self
):
sql
=
"CREATE TABLE "
for
tb
in
self
.
_tb_values
.
keys
():
tag_values
=
self
.
_tb_tags
[
tb
]
sql
+=
"IF NOT EXISTS "
+
tb
+
" USING meters TAGS "
+
tag_values
+
" "
self
.
_conn
.
execute
(
sql
)
docs/zh/07-develop/03-insert-data/05-high-volume.md
浏览文件 @
2300710d
...
...
@@ -8,6 +8,8 @@ title: 高效写入
为了更高效地向 TDengine 写入数据,客户端程序要充分且恰当地利用以上几个因素。在单次写入中尽量只向同一张表(或子表)写入数据,每批次写入的数据量经过测试和调优设定为一个最适合当前系统处理能力的数值,并发写入的连接数同样经过测试和调优后设定为一个最适合当前系统处理能力的数值,以实现在当前系统中的最佳写入速度。同时,TDengine 还提供了独特的参数绑定写入,这也是一个有助于实现高效写入的方法。
为了使写入最高效,除了客户端程序的设计,服务端的配置也很重要。如果无论怎么调节客户端程序,taosd 进程的 CPU 使用率都很低,那很可能需要增加 vgroup 的数量。比如:数据库总表数是 1000 且 minTablesPerVnode 设置的也是 1000,那么这个数据至多有一个 vgroup。此时如果将 minTablesPerVnode 和 tablelncStepPerVnode 都设置成 100, 则这个数据库有可能用到 10 个 vgroup。更多性能调优参数请参考
[
配置参考
](
../../reference/config
)
性能调优部分。
## 高效写入方案
下面的示例程序展示了如何高效写入数据:
...
...
@@ -19,6 +21,10 @@ title: 高效写入

:::note
上图所示架构,每个写任务只负责写特定的表,体现了数据的相邻性原则。但是读任务所读的表,我们假设是随机的。这样一个队列有多个写入线程(或进程),队列内部可能产生锁的消耗。实际场景,如果能做到一个读任务对应一个写任务是最好的。
:::
## Java 示例程序
在 Java 示例程序中采用拼接 SQL 的写入方式。
...
...
@@ -37,12 +43,18 @@ title: 高效写入
1.
读线程个数。默认为 1。
2.
写线程个数。默认为 3。
3.
模拟生成的总表数。默认为 1000。将会平分给各个读线程。
4.
每批最大数据量。默认为 3000。
4.
每批最多写入记录数量。默认为 3000。
<details>
<summary>
主程序
</summary>
```
java
title="主程序"
```
java
{{
#
include
docs
/
examples
/
java
/
src
/
main
/
java
/
com
/
taos
/
example
/
highvolume
/
FastWriteExample
.
java
:
main
}}
```
</details>
队列容量(taskQueueCapacity)也是与性能有关的参数,可通过修改程序调节。一般来讲,队列容量越大,入队被阻塞的概率越小,队列的吞吐量越大,但是内存占用也会越大。
### 读任务的实现
...
...
@@ -86,13 +98,17 @@ SQLWriter 类封装了拼 SQL 和写数据的逻辑。注意,所有的表都
### 执行示例程序
<details>
<summary>
执行 Java 示例程序
</summary>
执行程序前需配置环境变量
`TDENGINE_JDBC_URL`
。如果 TDengine Server 部署在本机,且用户名、密码和端口都是默认值,那么可配置:
```
TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
```
若要在本地集成开发环境执行示例程序,只需:
#### 本地集成开发环境执行示例程序
1.
clone TDengine 仓库
```
...
...
@@ -102,6 +118,8 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
3.
在开发环境中配置环境变量
`TDENGINE_JDBC_URL`
。如果已配置了全局的环境变量
`TDENGINE_JDBC_URL`
可跳过这一步。
4.
运行类
`com.taos.example.highvolume.FastWriteExample`
。
#### 远程服务器上执行示例程序
若要在服务器上执行示例程序,可按照下面的步骤操作:
1.
打包示例代码。在目录 TDengine/docs/examples/java 下执行:
...
...
@@ -130,14 +148,16 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
以上使用的是本地部署 TDengine Server 时默认的 JDBC URL。你需要根据自己的实际情况更改。
5.
用 java 命令启动示例程序,命令模板:
```
java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample <read_thread_count> <white_thread_count> <total_table_count> <max_batch_size>
```
6.
结束测试程序。测试程序不会自动结束,在获取到当前配置下稳定的写入速度后,按
<kbd>
CTRL
</kbd>
+
<kbd>
C
</kbd>
结束程序。
下面是一次实际运行的截图:
```
[
bding
@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
[
testuser
@vm95 java]$ java -classpath lib/*:javaexample-1.0.jar com.taos.example.highvolume.FastWriteExample 1 9 1000 2000
17:01:01.131 [main] INFO c.t.e.highvolume.FastWriteExample - readTaskCount=1, writeTaskCount=9 tableCount=1000 maxBatchSize=2000
17:01:01.286 [WriteThread-0] INFO c.taos.example.highvolume.WriteTask - started
17:01:01.354 [WriteThread-1] INFO c.taos.example.highvolume.WriteTask - started
...
...
@@ -161,10 +181,162 @@ TDENGINE_JDBC_URL="jdbc:TAOS://localhost:6030?user=root&password=taosdata"
17:02:46.988 [main] INFO c.t.e.highvolume.FastWriteExample - count=202895614 speed=2016000
```
</details>
## Python 示例程序
在 Python 示例程序中采用参数绑定的写入方式。(开发中)
该 Python 示例程序中采用了多进程的架构,并使用了跨进程的队列通信。写任务采用拼装 SQL 的方式写入。
### main 函数
main 函数负责创建消息队列和启动子进程,子进程有 3 类:
1.
1 个监控进程,负责数据库初始化和统计写入速度
2.
n 个读进程,负责从其它数据系统读数据
3.
m 个写进程,负责写数据库
main 函数可以接收 5 个启动参数,依次是:
1.
读任务(进程)数
2.
写任务(进程)数
3.
总表数据
4.
队列大小(单位字节)
5.
每批最多写入记录数量
<details>
<summary>
main 函数
</summary>
<!--
```python title="Python 示例程序"
```
python
{{
#include docs/examples/python/highvolume_faster_queue.py:main}}
```
</details>
### 监控进程
监控进程负责初始化数据库,并监控当前的写入速度。
<details>
<summary>
Monitor Process
</summary>
```
python
{{
#include docs/examples/python/highvolume_faster_queue.py:monitor}}
```
</details>
### 读进程
#### 读进程主要逻辑
读进程,负责从其它数据系统读数据,并分发数据到各个写进程。
<details>
<summary>
run_read_task 函数
</summary>
```
python
{{
#include docs/examples/python/highvolume_faster_queue.py:read}}
```
</details>
#### 模拟数据源
以下是模拟数据源的实现,我们假设数据源生成的每一条数据都带有目标表名信息。实际中你可能需要一定的规则确定目标表名。
<details>
<summary>
MockDataSource
</summary>
```
python
{{
#include docs/examples/python/highvolume_faster_queue.py:MockDataSource}}
```
</details>
### 写进程
写进程每次从队列中取出尽量多的数据,并批量写入。
<details>
<summary>
run_write_task 函数
</summary>
```
python
{{
#include docs/examples/python/highvolume_faster_queue.py:write}}
```
### SQLWriter 类的实现
SQLWriter 类封装了拼 SQL 和写数据的逻辑。所有的表都没有提前创建,而是写入出错的时候,再以超级表为模板批量建表,然后重新执行 INSERT 语句。这个类也对 SQL 是否超过最大长度限制做了检查,如果接近 SQL 最大长度限制(maxSQLLength),将会立即执行 SQL。为了减少 SQL 此时,建议将 maxSQLLength 适当调大。
</details>
<details>
<summary>
SQLWriter
</summary>
```
python
{{
#include docs/examples/python/sql_writer.py}}
```
</details>
### 执行示例程序
<details>
<summary>
执行 Python 示例程序
</summary>
1.
前提条件
-
已安装 TDengine 客户端驱动
-
已安装 Python3, 推荐版本 >= 3.8
-
已安装 taospy
2.
安装 faster-fifo 代替 python 内置的 multiprocessing.Queue
```
pip3 install faster-fifo
```
3.
点击上面的“查看源码”链接复制
`highvolume_faster_queue.py`
和
`sql_writer.py`
两个文件。
4.
执行示例程序
```
python3 highvolume_faster_queue.py <READ_TASK_COUNT> <WRITE_TASK_COUNT> <TABLE_COUNT> <QUEUE_SIZE> <MAX_BATCH_SIZE>
```
下面是一次实际运行的输出:
```
[testuser@vm95 python]$ python3 highvolume_faster_queue.py 10 10 1000 1000000 3000
2022-07-12 21:53:07,147 [root] - READ_TASK_COUNT=10, WRITE_TASK_COUNT=10, TABLE_COUNT=1000, QUEUE_SIZE=1000000, MAX_BATCH_SIZE=3000
2022-07-12 21:53:10,168 [root] - WriteTask-0 started with pid 6228
2022-07-12 21:53:10,168 [root] - WriteTask-1 started with pid 6229
2022-07-12 21:53:10,169 [root] - WriteTask-2 started with pid 6230
2022-07-12 21:53:10,169 [root] - WriteTask-3 started with pid 6231
2022-07-12 21:53:10,170 [root] - WriteTask-4 started with pid 6232
2022-07-12 21:53:10,171 [root] - WriteTask-5 started with pid 6233
2022-07-12 21:53:10,171 [root] - WriteTask-6 started with pid 6234
2022-07-12 21:53:10,172 [root] - WriteTask-7 started with pid 6235
2022-07-12 21:53:10,172 [root] - WriteTask-8 started with pid 6236
2022-07-12 21:53:10,173 [root] - WriteTask-9 started with pid 6237
2022-07-12 21:53:10,174 [root] - ReadTask-0 started with pid 6238
2022-07-12 21:53:10,175 [root] - ReadTask-1 started with pid 6239
2022-07-12 21:53:10,176 [root] - ReadTask-2 started with pid 6240
2022-07-12 21:53:10,177 [root] - ReadTask-3 started with pid 6241
2022-07-12 21:53:10,178 [root] - ReadTask-4 started with pid 6242
2022-07-12 21:53:10,179 [root] - ReadTask-5 started with pid 6243
2022-07-12 21:53:10,180 [root] - ReadTask-6 started with pid 6244
2022-07-12 21:53:10,181 [root] - ReadTask-7 started with pid 6245
2022-07-12 21:53:10,181 [root] - ReadTask-8 started with pid 6246
2022-07-12 21:53:10,182 [root] - ReadTask-9 started with pid 6247
2022-07-12 21:53:17,375 [DataBaseMonitor] - count=3333857 speed=333385.7
2022-07-12 21:53:27,564 [DataBaseMonitor] - count=8883905 speed=555004.8
2022-07-12 21:53:37,742 [DataBaseMonitor] - count=14233135 speed=534923.0
2022-07-12 21:53:47,926 [DataBaseMonitor] - count=19759409 speed=552627.4
2022-07-12 21:53:58,275 [DataBaseMonitor] - count=25245406 speed=548599.7
2022-07-12 21:54:08,478 [DataBaseMonitor] - count=30644263 speed=539885.7
2022-07-12 21:54:18,772 [DataBaseMonitor] - count=36110956 speed=546669.3
2022-07-12 21:54:29,031 [DataBaseMonitor] - count=41456746 speed=534579.0
```
```
--
>
</details
>
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录