Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
658ba5c6
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
658ba5c6
编写于
7月 11, 2022
作者:
D
dingbo
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs: document for python example
上级
c810b6b1
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
25 addition
and
14 deletion
+25
-14
docs/examples/python/highvolume_example.py
docs/examples/python/highvolume_example.py
+25
-14
未找到文件。
docs/examples/python/highvolume_example.py
浏览文件 @
658ba5c6
import
logging
import
logging
import
sys
import
sys
import
threading
import
time
import
time
from
multiprocessing
import
Queue
,
Process
from
multiprocessing
import
Queue
,
Process
from
queue
import
Empty
from
queue
import
Empty
from
typing
import
List
import
taos
import
taos
from
taos
import
TaosConnection
from
taos
import
TaosConnection
from
typing
import
List
logging
.
basicConfig
(
stream
=
sys
.
stdout
,
level
=
logging
.
DEBUG
,
format
=
"%(asctime)s [%(name)s] - %(message)s"
)
logging
.
basicConfig
(
stream
=
sys
.
stdout
,
level
=
logging
.
DEBUG
,
format
=
"%(asctime)s [%(name)s] - %(message)s"
)
...
@@ -25,6 +24,7 @@ def get_connection():
...
@@ -25,6 +24,7 @@ def get_connection():
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
# ANCHOR: DataBaseMonitor
class
DataBaseMonitor
:
class
DataBaseMonitor
:
def
__init__
(
self
):
def
__init__
(
self
):
self
.
conn
:
TaosConnection
=
get_connection
()
self
.
conn
:
TaosConnection
=
get_connection
()
...
@@ -55,10 +55,13 @@ class DataBaseMonitor:
...
@@ -55,10 +55,13 @@ class DataBaseMonitor:
logging
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
logging
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
last_count
=
count
last_count
=
count
except
KeyboardInterrupt
:
except
KeyboardInterrupt
:
[
p
.
kill
()
for
p
in
read_processes
]
[
p
.
terminate
()
for
p
in
read_processes
]
[
p
.
kill
for
p
in
write_processes
]
[
p
.
terminate
()
for
p
in
write_processes
]
# ANCHOR_END: DataBaseMonitor
# ANCHOR: MockDataSource
class
MockDataSource
:
class
MockDataSource
:
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
current
=
[
8.8
,
10.7
,
9.9
,
8.9
,
9.4
]
current
=
[
8.8
,
10.7
,
9.9
,
8.9
,
9.4
]
...
@@ -91,6 +94,9 @@ class MockDataSource:
...
@@ -91,6 +94,9 @@ class MockDataSource:
raise
StopIteration
raise
StopIteration
# ANCHOR_END: MockDataSource
# ANCHOR: SQLWriter
class
SQLWriter
:
class
SQLWriter
:
log
=
logging
.
getLogger
(
"SQLWriter"
)
log
=
logging
.
getLogger
(
"SQLWriter"
)
...
@@ -170,19 +176,21 @@ class SQLWriter:
...
@@ -170,19 +176,21 @@ class SQLWriter:
return
self
.
_buffered_count
return
self
.
_buffered_count
def
run_read_task
(
task_id
:
int
,
task_queues
:
List
[
Queue
]):
# ANCHOR_END: SQLWriter
log
=
logging
.
getLogger
(
f
"ReadTask-
{
task_id
}
"
)
# ANCHOR: read
def
run_read_task
(
task_id
:
int
,
task_queues
:
List
[
Queue
]):
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
try
:
for
table_id
,
line
in
data_source
:
for
table_id
,
line
in
data_source
:
i
=
table_id
%
len
(
task_queues
)
i
=
table_id
%
len
(
task_queues
)
task_queues
[
i
].
put
(
line
,
block
=
True
)
task_queues
[
i
].
put
(
line
,
block
=
True
)
except
KeyboardInterrupt
:
pass
# ANCHOR_END: read
# ANCHOR: write
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
writer
=
SQLWriter
()
writer
=
SQLWriter
()
...
@@ -196,14 +204,14 @@ def run_write_task(task_id: int, queue: Queue):
...
@@ -196,14 +204,14 @@ def run_write_task(task_id: int, queue: Queue):
writer
.
flush
()
writer
.
flush
()
else
:
else
:
time
.
sleep
(
0.01
)
time
.
sleep
(
0.01
)
except
KeyboardInterrupt
:
pass
except
BaseException
as
e
:
except
BaseException
as
e
:
msg
=
f
"line=
{
line
}
, buffer_count=
{
writer
.
buffered_count
}
"
msg
=
f
"line=
{
line
}
, buffer_count=
{
writer
.
buffered_count
}
"
log
.
debug
(
msg
)
log
.
debug
(
msg
)
raise
e
raise
e
# ANCHOR_END: write
def
set_global_config
():
def
set_global_config
():
argc
=
len
(
sys
.
argv
)
argc
=
len
(
sys
.
argv
)
if
argc
>
1
:
if
argc
>
1
:
...
@@ -220,6 +228,7 @@ def set_global_config():
...
@@ -220,6 +228,7 @@ def set_global_config():
MAX_BATCH_SIZE
=
sys
.
argv
[
4
]
MAX_BATCH_SIZE
=
sys
.
argv
[
4
]
# ANCHOR: main
def
main
():
def
main
():
set_global_config
()
set_global_config
()
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, TABLE_COUNT=
{
TABLE_COUNT
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, TABLE_COUNT=
{
TABLE_COUNT
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
...
@@ -248,5 +257,7 @@ def main():
...
@@ -248,5 +257,7 @@ def main():
database_monitor
.
stat_and_print
()
database_monitor
.
stat_and_print
()
# ANCHOR_END: main
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
main
()
main
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录