Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6e4d3cf4
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6e4d3cf4
编写于
7月 12, 2022
作者:
B
Bo Ding
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs: test highvolume_faster_queue.py
上级
09b6aac0
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
275 addition
and
297 deletion
+275
-297
docs/examples/python/highvolume_faster_queue.py
docs/examples/python/highvolume_faster_queue.py
+63
-77
docs/examples/python/highvolume_mp_queue.py
docs/examples/python/highvolume_mp_queue.py
+193
-193
docs/examples/python/sql_writer.py
docs/examples/python/sql_writer.py
+19
-27
未找到文件。
docs/examples/python/highvolume_faster_queue.py
浏览文件 @
6e4d3cf4
# install dependencies:
# install dependencies:
# python >= 3.8
#
recommend
python >= 3.8
# pip3 install faster-fifo
# pip3 install faster-fifo
#
#
import
logging
import
logging
import
sys
import
sys
import
time
import
time
from
multiprocessing
import
Process
from
multiprocessing
import
Process
from
faster_fifo
import
Queue
from
queue
import
Empty
from
queue
import
Empty
from
typing
import
List
from
typing
import
List
...
@@ -13,60 +15,14 @@ logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s
...
@@ -13,60 +15,14 @@ logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s
READ_TASK_COUNT
=
1
READ_TASK_COUNT
=
1
WRITE_TASK_COUNT
=
1
WRITE_TASK_COUNT
=
1
QUEUE_SIZE
=
1000
TABLE_COUNT
=
1000
TABLE_COUNT
=
1000
QUEUE_SIZE
=
1000000
MAX_BATCH_SIZE
=
3000
MAX_BATCH_SIZE
=
3000
read_processes
=
[]
read_processes
=
[]
write_processes
=
[]
write_processes
=
[]
# ANCHOR: DataBaseMonitor
class
DataBaseMonitor
:
"""
Start a thread.
Prepare database and stable.
Statistic writing speed and print it every 10 seconds.
"""
def
__init__
(
self
):
self
.
process
=
Process
(
target
=
self
.
run
)
self
.
process
.
start
()
def
get_connection
(
self
):
import
taos
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
def
prepare_database
(
self
,
conn
):
conn
.
execute
(
"DROP DATABASE IF EXISTS test"
)
conn
.
execute
(
"CREATE DATABASE test"
)
conn
.
execute
(
"CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
def
get_count
(
self
,
conn
):
res
=
conn
.
query
(
"SELECT count(*) FROM test.meters"
)
rows
=
res
.
fetch_all
()
return
rows
[
0
][
0
]
if
rows
else
0
def
run
(
self
):
log
=
logging
.
getLogger
(
"DataBaseMonitor"
)
conn
=
self
.
get_connection
()
self
.
prepare_database
(
conn
)
last_count
=
0
while
True
:
time
.
sleep
(
10
)
count
=
self
.
get_count
(
conn
)
log
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
last_count
=
count
def
join
(
self
):
self
.
process
.
join
()
def
stop
(
self
):
self
.
process
.
terminate
()
# ANCHOR_END: DataBaseMonitor
# ANCHOR: MockDataSource
# ANCHOR: MockDataSource
class
MockDataSource
:
class
MockDataSource
:
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
...
@@ -86,18 +42,25 @@ class MockDataSource:
...
@@ -86,18 +42,25 @@ class MockDataSource:
return
self
return
self
def
__next__
(
self
):
def
__next__
(
self
):
"""
next 100 rows of current table
"""
self
.
table_id
+=
1
self
.
table_id
+=
1
if
self
.
table_id
==
self
.
table_count
:
if
self
.
table_id
==
self
.
table_count
:
self
.
table_id
=
0
self
.
table_id
=
0
if
self
.
row
>=
self
.
max_rows_per_table
:
raise
StopIteration
rows
=
[]
while
len
(
rows
)
<
100
:
self
.
row
+=
1
self
.
row
+=
1
if
self
.
row
<
self
.
max_rows_per_table
:
ts
=
self
.
start_ms
+
100
*
self
.
row
ts
=
self
.
start_ms
+
100
*
self
.
row
group_id
=
self
.
table_id
%
5
if
self
.
table_id
%
5
==
0
else
self
.
table_id
%
5
+
1
group_id
=
self
.
table_id
%
5
if
self
.
table_id
%
5
==
0
else
self
.
table_id
%
5
+
1
tb_name
=
self
.
table_name_prefix
+
'_'
+
str
(
self
.
table_id
)
tb_name
=
self
.
table_name_prefix
+
'_'
+
str
(
self
.
table_id
)
ri
=
self
.
row
%
5
ri
=
self
.
row
%
5
r
eturn
self
.
table_id
,
f
"
{
tb_name
}
,
{
ts
}
,
{
self
.
current
[
ri
]
}
,
{
self
.
voltage
[
ri
]
}
,
{
self
.
phase
[
ri
]
}
,
{
self
.
location
[
ri
]
}
,
{
group_id
}
"
r
ows
.
append
(
else
:
f
"
{
tb_name
}
,
{
ts
}
,
{
self
.
current
[
ri
]
}
,
{
self
.
voltage
[
ri
]
}
,
{
self
.
phase
[
ri
]
}
,
{
self
.
location
[
ri
]
}
,
{
group_id
}
"
)
raise
StopIteration
return
self
.
table_id
,
rows
# ANCHOR_END: MockDataSource
# ANCHOR_END: MockDataSource
...
@@ -107,9 +70,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
...
@@ -107,9 +70,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
try
:
try
:
for
table_id
,
line
in
data_source
:
for
table_id
,
rows
in
data_source
:
i
=
table_id
%
len
(
task_queues
)
i
=
table_id
%
len
(
task_queues
)
task_queues
[
i
].
put
(
line
,
block
=
True
)
task_queues
[
i
].
put
_many
(
rows
,
block
=
True
,
timeout
=-
1
)
except
KeyboardInterrupt
:
except
KeyboardInterrupt
:
pass
pass
...
@@ -120,27 +83,23 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
...
@@ -120,27 +83,23 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
from
sql_writer
import
SQLWriter
from
sql_writer
import
SQLWriter
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
writer
=
SQLWriter
(
MAX_BATCH_SIZE
)
writer
=
SQLWriter
()
lines
=
None
try
:
try
:
while
True
:
while
True
:
try
:
try
:
line
=
queue
.
get
(
block
=
False
)
line
s
=
queue
.
get_many
(
block
=
False
,
max_messages_to_get
=
MAX_BATCH_SIZE
)
writer
.
process_line
(
line
)
writer
.
process_line
s
(
lines
)
except
Empty
:
except
Empty
:
if
writer
.
buffered_count
>
0
:
time
.
sleep
(
0.01
)
writer
.
flush
()
else
:
time
.
sleep
(
0.01
)
except
KeyboardInterrupt
:
except
KeyboardInterrupt
:
pass
pass
except
BaseException
as
e
:
except
BaseException
as
e
:
msg
=
f
"line=
{
line
}
, buffer_count=
{
writer
.
buffered_count
}
"
log
.
debug
(
f
"lines=
{
lines
}
"
)
log
.
debug
(
msg
)
raise
e
raise
e
# ANCHOR_END: write
# ANCHOR_END: write
def
set_global_config
():
def
set_global_config
():
argc
=
len
(
sys
.
argv
)
argc
=
len
(
sys
.
argv
)
if
argc
>
1
:
if
argc
>
1
:
...
@@ -150,34 +109,60 @@ def set_global_config():
...
@@ -150,34 +109,60 @@ def set_global_config():
global
WRITE_TASK_COUNT
global
WRITE_TASK_COUNT
WRITE_TASK_COUNT
=
int
(
sys
.
argv
[
2
])
WRITE_TASK_COUNT
=
int
(
sys
.
argv
[
2
])
if
argc
>
3
:
if
argc
>
3
:
global
QUEUE_SIZE
QUEUE_SIZE
=
int
(
sys
.
argv
[
3
])
if
argc
>
4
:
global
TABLE_COUNT
global
TABLE_COUNT
TABLE_COUNT
=
int
(
sys
.
argv
[
4
])
TABLE_COUNT
=
int
(
sys
.
argv
[
3
])
if
argc
>
4
:
global
QUEUE_SIZE
QUEUE_SIZE
=
int
(
sys
.
argv
[
4
])
if
argc
>
5
:
if
argc
>
5
:
global
MAX_BATCH_SIZE
global
MAX_BATCH_SIZE
MAX_BATCH_SIZE
=
int
(
sys
.
argv
[
5
])
MAX_BATCH_SIZE
=
int
(
sys
.
argv
[
5
])
# ANCHOR: main
# ANCHOR: main
def
run_monitor_process
():
import
taos
log
=
logging
.
getLogger
(
"DataBaseMonitor"
)
conn
=
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
conn
.
execute
(
"DROP DATABASE IF EXISTS test"
)
conn
.
execute
(
"CREATE DATABASE test"
)
conn
.
execute
(
"CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)"
)
def
get_count
():
res
=
conn
.
query
(
"SELECT count(*) FROM test.meters"
)
rows
=
res
.
fetch_all
()
return
rows
[
0
][
0
]
if
rows
else
0
last_count
=
0
while
True
:
time
.
sleep
(
10
)
count
=
get_count
()
log
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
last_count
=
count
def
main
():
def
main
():
set_global_config
()
set_global_config
()
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, QUEUE_SIZE=
{
QUEUE_SIZE
}
, TABLE_COUNT=
{
TABLE_COUNT
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, "
f
"TABLE_COUNT=
{
TABLE_COUNT
}
, QUEUE_SIZE=
{
QUEUE_SIZE
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
database_monitor
=
DataBaseMonitor
()
monitor_process
=
Process
(
target
=
run_monitor_process
)
time
.
sleep
(
3
)
# wait for database ready
monitor_process
.
start
()
time
.
sleep
(
3
)
task_queues
:
List
[
Queue
]
=
[]
task_queues
:
List
[
Queue
]
=
[]
# create task queues
for
i
in
range
(
WRITE_TASK_COUNT
):
for
i
in
range
(
WRITE_TASK_COUNT
):
queue
=
Queue
(
max
size
=
QUEUE_SIZE
)
queue
=
Queue
(
max
_size_bytes
=
QUEUE_SIZE
)
task_queues
.
append
(
queue
)
task_queues
.
append
(
queue
)
p
=
Process
(
target
=
run_write_task
,
args
=
(
i
,
queue
))
# create write processes
for
i
in
range
(
WRITE_TASK_COUNT
):
p
=
Process
(
target
=
run_write_task
,
args
=
(
i
,
task_queues
[
i
]))
p
.
start
()
p
.
start
()
logging
.
debug
(
f
"WriteTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
logging
.
debug
(
f
"WriteTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
write_processes
.
append
(
p
)
write_processes
.
append
(
p
)
# create read processes
for
i
in
range
(
READ_TASK_COUNT
):
for
i
in
range
(
READ_TASK_COUNT
):
p
=
Process
(
target
=
run_read_task
,
args
=
(
i
,
task_queues
))
p
=
Process
(
target
=
run_read_task
,
args
=
(
i
,
task_queues
))
p
.
start
()
p
.
start
()
...
@@ -185,11 +170,12 @@ def main():
...
@@ -185,11 +170,12 @@ def main():
read_processes
.
append
(
p
)
read_processes
.
append
(
p
)
try
:
try
:
database_monitor
.
join
()
monitor_process
.
join
()
except
KeyboardInterrupt
:
except
KeyboardInterrupt
:
database_monitor
.
stop
()
monitor_process
.
terminate
()
[
p
.
terminate
()
for
p
in
read_processes
]
[
p
.
terminate
()
for
p
in
read_processes
]
[
p
.
terminate
()
for
p
in
write_processes
]
[
p
.
terminate
()
for
p
in
write_processes
]
[
q
.
close
()
for
q
in
task_queues
]
if
__name__
==
'__main__'
:
if
__name__
==
'__main__'
:
...
...
docs/examples/python/highvolume_mp_queue.py
浏览文件 @
6e4d3cf4
import
logging
#
import logging
import
sys
#
import sys
import
time
#
import time
from
multiprocessing
import
Queue
,
Process
#
from multiprocessing import Queue, Process
from
queue
import
Empty
#
from queue import Empty
from
typing
import
List
#
from typing import List
#
logging
.
basicConfig
(
stream
=
sys
.
stdout
,
level
=
logging
.
DEBUG
,
format
=
"%(asctime)s [%(name)s] - %(message)s"
)
#
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
#
READ_TASK_COUNT
=
1
#
READ_TASK_COUNT = 1
WRITE_TASK_COUNT
=
1
#
WRITE_TASK_COUNT = 1
QUEUE_SIZE
=
1000
#
QUEUE_SIZE = 1000
TABLE_COUNT
=
1000
#
TABLE_COUNT = 1000
MAX_BATCH_SIZE
=
3000
#
MAX_BATCH_SIZE = 3000
#
read_processes
=
[]
#
read_processes = []
write_processes
=
[]
#
write_processes = []
#
#
# ANCHOR: DataBaseMonitor
#
#
ANCHOR: DataBaseMonitor
class
DataBaseMonitor
:
#
class DataBaseMonitor:
"""
#
"""
Start a thread.
#
Start a thread.
Prepare database and stable.
#
Prepare database and stable.
Statistic writing speed and print it every 10 seconds.
#
Statistic writing speed and print it every 10 seconds.
"""
#
"""
#
def
__init__
(
self
):
#
def __init__(self):
self
.
process
=
Process
(
target
=
self
.
run
)
#
self.process = Process(target=self.run)
self
.
process
.
start
()
#
self.process.start()
#
def
get_connection
(
self
):
#
def get_connection(self):
import
taos
#
import taos
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
#
return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
#
def
prepare_database
(
self
,
conn
):
#
def prepare_database(self, conn):
conn
.
execute
(
"DROP DATABASE IF EXISTS test"
)
#
conn.execute("DROP DATABASE IF EXISTS test")
conn
.
execute
(
"CREATE DATABASE test"
)
#
conn.execute("CREATE DATABASE test")
conn
.
execute
(
"CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"
)
#
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
#
def
get_count
(
self
,
conn
):
#
def get_count(self, conn):
res
=
conn
.
query
(
"SELECT count(*) FROM test.meters"
)
#
res = conn.query("SELECT count(*) FROM test.meters")
rows
=
res
.
fetch_all
()
#
rows = res.fetch_all()
return
rows
[
0
][
0
]
if
rows
else
0
#
return rows[0][0] if rows else 0
#
def
run
(
self
):
#
def run(self):
log
=
logging
.
getLogger
(
"DataBaseMonitor"
)
#
log = logging.getLogger("DataBaseMonitor")
conn
=
self
.
get_connection
()
#
conn = self.get_connection()
self
.
prepare_database
(
conn
)
#
self.prepare_database(conn)
last_count
=
0
#
last_count = 0
while
True
:
#
while True:
time
.
sleep
(
10
)
#
time.sleep(10)
count
=
self
.
get_count
(
conn
)
#
count = self.get_count(conn)
log
.
info
(
f
"count=
{
count
}
speed=
{
(
count
-
last_count
)
/
10
}
"
)
#
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count
=
count
#
last_count = count
#
def
join
(
self
):
#
def join(self):
self
.
process
.
join
()
#
self.process.join()
#
def
stop
(
self
):
#
def stop(self):
self
.
process
.
terminate
()
#
self.process.terminate()
#
#
# ANCHOR_END: DataBaseMonitor
#
#
ANCHOR_END: DataBaseMonitor
#
# ANCHOR: MockDataSource
#
#
ANCHOR: MockDataSource
class
MockDataSource
:
#
class MockDataSource:
location
=
[
"LosAngeles"
,
"SanDiego"
,
"Hollywood"
,
"Compton"
,
"San Francisco"
]
#
location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
current
=
[
8.8
,
10.7
,
9.9
,
8.9
,
9.4
]
#
current = [8.8, 10.7, 9.9, 8.9, 9.4]
voltage
=
[
119
,
116
,
111
,
113
,
118
]
#
voltage = [119, 116, 111, 113, 118]
phase
=
[
0.32
,
0.34
,
0.33
,
0.329
,
0.141
]
#
phase = [0.32, 0.34, 0.33, 0.329, 0.141]
max_rows_per_table
=
10
**
9
#
max_rows_per_table = 10 ** 9
#
def
__init__
(
self
,
tb_name_prefix
,
table_count
):
#
def __init__(self, tb_name_prefix, table_count):
self
.
table_name_prefix
=
tb_name_prefix
#
self.table_name_prefix = tb_name_prefix
self
.
table_count
=
table_count
#
self.table_count = table_count
self
.
start_ms
=
round
(
time
.
time
()
*
1000
)
-
self
.
max_rows_per_table
*
100
#
self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
#
def
__iter__
(
self
):
#
def __iter__(self):
self
.
row
=
0
#
self.row = 0
self
.
table_id
=
-
1
#
self.table_id = -1
return
self
#
return self
#
def
__next__
(
self
):
#
def __next__(self):
self
.
table_id
+=
1
#
self.table_id += 1
if
self
.
table_id
==
self
.
table_count
:
#
if self.table_id == self.table_count:
self
.
table_id
=
0
#
self.table_id = 0
self
.
row
+=
1
#
self.row += 1
if
self
.
row
<
self
.
max_rows_per_table
:
#
if self.row < self.max_rows_per_table:
ts
=
self
.
start_ms
+
100
*
self
.
row
#
ts = self.start_ms + 100 * self.row
group_id
=
self
.
table_id
%
5
if
self
.
table_id
%
5
==
0
else
self
.
table_id
%
5
+
1
#
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name
=
self
.
table_name_prefix
+
'_'
+
str
(
self
.
table_id
)
#
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri
=
self
.
row
%
5
#
ri = self.row % 5
return
self
.
table_id
,
f
"
{
tb_name
}
,
{
ts
}
,
{
self
.
current
[
ri
]
}
,
{
self
.
voltage
[
ri
]
}
,
{
self
.
phase
[
ri
]
}
,
{
self
.
location
[
ri
]
}
,
{
group_id
}
"
#
return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}"
else
:
#
else:
raise
StopIteration
#
raise StopIteration
#
#
# ANCHOR_END: MockDataSource
#
#
ANCHOR_END: MockDataSource
#
# ANCHOR: read
#
#
ANCHOR: read
def
run_read_task
(
task_id
:
int
,
task_queues
:
List
[
Queue
]):
#
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task
=
TABLE_COUNT
//
READ_TASK_COUNT
#
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source
=
MockDataSource
(
f
"tb
{
task_id
}
"
,
table_count_per_task
)
#
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try
:
#
try:
for
table_id
,
line
in
data_source
:
#
for table_id, line in data_source:
i
=
table_id
%
len
(
task_queues
)
#
i = table_id % len(task_queues)
task_queues
[
i
].
put
(
line
,
block
=
True
)
#
task_queues[i].put(line, block=True)
except
KeyboardInterrupt
:
#
except KeyboardInterrupt:
pass
#
pass
#
#
# ANCHOR_END: read
#
#
ANCHOR_END: read
#
# ANCHOR: write
#
#
ANCHOR: write
def
run_write_task
(
task_id
:
int
,
queue
:
Queue
):
#
def run_write_task(task_id: int, queue: Queue):
from
sql_writer
import
SQLWriter
#
from sql_writer import SQLWriter
log
=
logging
.
getLogger
(
f
"WriteTask-
{
task_id
}
"
)
#
log = logging.getLogger(f"WriteTask-{task_id}")
writer
=
SQLWriter
(
MAX_BATCH_SIZE
)
#
writer = SQLWriter(MAX_BATCH_SIZE)
try
:
#
try:
while
True
:
#
while True:
try
:
#
try:
line
=
queue
.
get
(
block
=
False
)
#
line = queue.get(block=False)
writer
.
process_line
(
line
)
#
writer.process_line(line)
except
Empty
:
#
except Empty:
if
writer
.
buffered_count
>
0
:
#
if writer.buffered_count > 0:
writer
.
flush
()
#
writer.flush()
else
:
#
else:
time
.
sleep
(
0.01
)
#
time.sleep(0.01)
except
KeyboardInterrupt
:
#
except KeyboardInterrupt:
pass
#
pass
except
BaseException
as
e
:
#
except BaseException as e:
msg
=
f
"line=
{
line
}
, buffer_count=
{
writer
.
buffered_count
}
"
#
msg = f"line={line}, buffer_count={writer.buffered_count}"
log
.
debug
(
msg
)
#
log.debug(msg)
raise
e
#
raise e
#
#
# ANCHOR_END: write
#
#
ANCHOR_END: write
#
def
set_global_config
():
#
def set_global_config():
argc
=
len
(
sys
.
argv
)
#
argc = len(sys.argv)
if
argc
>
1
:
#
if argc > 1:
global
READ_TASK_COUNT
#
global READ_TASK_COUNT
READ_TASK_COUNT
=
int
(
sys
.
argv
[
1
])
#
READ_TASK_COUNT = int(sys.argv[1])
if
argc
>
2
:
#
if argc > 2:
global
WRITE_TASK_COUNT
#
global WRITE_TASK_COUNT
WRITE_TASK_COUNT
=
int
(
sys
.
argv
[
2
])
#
WRITE_TASK_COUNT = int(sys.argv[2])
if
argc
>
3
:
#
if argc > 3:
global
QUEUE_SIZE
#
global QUEUE_SIZE
QUEUE_SIZE
=
int
(
sys
.
argv
[
3
])
#
QUEUE_SIZE = int(sys.argv[3])
if
argc
>
4
:
#
if argc > 4:
global
TABLE_COUNT
#
global TABLE_COUNT
TABLE_COUNT
=
int
(
sys
.
argv
[
4
])
#
TABLE_COUNT = int(sys.argv[4])
if
argc
>
5
:
#
if argc > 5:
global
MAX_BATCH_SIZE
#
global MAX_BATCH_SIZE
MAX_BATCH_SIZE
=
int
(
sys
.
argv
[
5
])
#
MAX_BATCH_SIZE = int(sys.argv[5])
#
#
# ANCHOR: main
#
#
ANCHOR: main
def
main
():
#
def main():
set_global_config
()
#
set_global_config()
logging
.
info
(
f
"READ_TASK_COUNT=
{
READ_TASK_COUNT
}
, WRITE_TASK_COUNT=
{
WRITE_TASK_COUNT
}
, QUEUE_SIZE=
{
QUEUE_SIZE
}
, TABLE_COUNT=
{
TABLE_COUNT
}
, MAX_BATCH_SIZE=
{
MAX_BATCH_SIZE
}
"
)
#
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
#
database_monitor
=
DataBaseMonitor
()
#
database_monitor = DataBaseMonitor()
time
.
sleep
(
3
)
# wait for database ready
#
time.sleep(3) # wait for database ready
#
task_queues
:
List
[
Queue
]
=
[]
#
task_queues: List[Queue] = []
#
for
i
in
range
(
WRITE_TASK_COUNT
):
#
for i in range(WRITE_TASK_COUNT):
queue
=
Queue
(
maxsize
=
QUEUE_SIZE
)
#
queue = Queue(maxsize=QUEUE_SIZE)
task_queues
.
append
(
queue
)
#
task_queues.append(queue)
p
=
Process
(
target
=
run_write_task
,
args
=
(
i
,
queue
))
#
p = Process(target=run_write_task, args=(i, queue))
p
.
start
()
#
p.start()
logging
.
debug
(
f
"WriteTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
#
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes
.
append
(
p
)
#
write_processes.append(p)
#
for
i
in
range
(
READ_TASK_COUNT
):
#
for i in range(READ_TASK_COUNT):
p
=
Process
(
target
=
run_read_task
,
args
=
(
i
,
task_queues
))
#
p = Process(target=run_read_task, args=(i, task_queues))
p
.
start
()
#
p.start()
logging
.
debug
(
f
"ReadTask-
{
i
}
started with pid
{
p
.
pid
}
"
)
#
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes
.
append
(
p
)
#
read_processes.append(p)
#
try
:
#
try:
database_monitor
.
join
()
#
database_monitor.join()
except
KeyboardInterrupt
:
#
except KeyboardInterrupt:
database_monitor
.
stop
()
#
database_monitor.stop()
[
p
.
terminate
()
for
p
in
read_processes
]
#
[p.terminate() for p in read_processes]
[
p
.
terminate
()
for
p
in
write_processes
]
#
[p.terminate() for p in write_processes]
#
#
if
__name__
==
'__main__'
:
#
if __name__ == '__main__':
main
()
#
main()
# ANCHOR_END: main
#
#
ANCHOR_END: main
docs/examples/python/sql_writer.py
浏览文件 @
6e4d3cf4
...
@@ -5,9 +5,7 @@ import taos
...
@@ -5,9 +5,7 @@ import taos
class
SQLWriter
:
class
SQLWriter
:
log
=
logging
.
getLogger
(
"SQLWriter"
)
log
=
logging
.
getLogger
(
"SQLWriter"
)
def
__init__
(
self
,
max_batch_size
):
def
__init__
(
self
):
self
.
_buffered_count
=
0
self
.
_max_batch_size
=
max_batch_size
self
.
_tb_values
=
{}
self
.
_tb_values
=
{}
self
.
_tb_tags
=
{}
self
.
_tb_tags
=
{}
self
.
_conn
=
self
.
get_connection
()
self
.
_conn
=
self
.
get_connection
()
...
@@ -21,30 +19,29 @@ class SQLWriter:
...
@@ -21,30 +19,29 @@ class SQLWriter:
if
name
==
"maxSQLLength"
:
if
name
==
"maxSQLLength"
:
return
int
(
r
[
1
])
return
int
(
r
[
1
])
def
get_connection
(
self
):
@
staticmethod
def
get_connection
():
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
return
taos
.
connect
(
host
=
"localhost"
,
user
=
"root"
,
password
=
"taosdata"
,
port
=
6030
)
def
process_line
(
self
,
line
:
str
):
def
process_line
s
(
self
,
lines
:
str
):
"""
"""
:param line
: tbName,ts,current,voltage,phase,location,groupId
:param line
s: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
"""
self
.
_buffered_count
+=
1
for
line
in
lines
:
ps
=
line
.
split
(
","
)
ps
=
line
.
split
(
","
)
table_name
=
ps
[
0
]
table_name
=
ps
[
0
]
value
=
'('
+
","
.
join
(
ps
[
1
:
-
2
])
+
') '
value
=
'('
+
","
.
join
(
ps
[
1
:
-
2
])
+
') '
if
table_name
in
self
.
_tb_values
:
if
table_name
in
self
.
_tb_values
:
self
.
_tb_values
[
table_name
]
+=
value
self
.
_tb_values
[
table_name
]
+=
value
else
:
else
:
self
.
_tb_values
[
table_name
]
=
value
self
.
_tb_values
[
table_name
]
=
value
if
table_name
not
in
self
.
_tb_tags
:
location
=
ps
[
-
2
]
group_id
=
ps
[
-
1
]
tag_value
=
f
"('
{
location
}
',
{
group_id
}
)"
self
.
_tb_tags
[
table_name
]
=
tag_value
if
self
.
_buffered_count
==
self
.
_max_batch_size
:
if
table_name
not
in
self
.
_tb_tags
:
self
.
flush
()
location
=
ps
[
-
2
]
group_id
=
ps
[
-
1
]
tag_value
=
f
"('
{
location
}
',
{
group_id
}
)"
self
.
_tb_tags
[
table_name
]
=
tag_value
self
.
flush
()
def
flush
(
self
):
def
flush
(
self
):
"""
"""
...
@@ -68,7 +65,6 @@ class SQLWriter:
...
@@ -68,7 +65,6 @@ class SQLWriter:
sql
+=
" "
.
join
(
buf
)
sql
+=
" "
.
join
(
buf
)
self
.
execute_sql
(
sql
)
self
.
execute_sql
(
sql
)
self
.
_tb_values
.
clear
()
self
.
_tb_values
.
clear
()
self
.
_buffered_count
=
0
def
execute_sql
(
self
,
sql
):
def
execute_sql
(
self
,
sql
):
try
:
try
:
...
@@ -87,7 +83,3 @@ class SQLWriter:
...
@@ -87,7 +83,3 @@ class SQLWriter:
tag_values
=
self
.
_tb_tags
[
tb
]
tag_values
=
self
.
_tb_tags
[
tb
]
sql
+=
"IF NOT EXISTS "
+
tb
+
" USING meters TAGS "
+
tag_values
+
" "
sql
+=
"IF NOT EXISTS "
+
tb
+
" USING meters TAGS "
+
tag_values
+
" "
self
.
_conn
.
execute
(
sql
)
self
.
_conn
.
execute
(
sql
)
@
property
def
buffered_count
(
self
):
return
self
.
_buffered_count
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录