提交 744df1eb 编写于 作者: D dingbo

docs: test highvolume_example.py

上级 3bbb38c9
import logging import logging
import sys import sys
from threading import Thread
import time import time
from multiprocessing import Queue, Process from multiprocessing import Queue, Process
from queue import Empty from queue import Empty
from typing import List from typing import List
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
READ_TASK_COUNT = 1 READ_TASK_COUNT = 1
...@@ -18,17 +16,6 @@ read_processes = [] ...@@ -18,17 +16,6 @@ read_processes = []
write_processes = [] write_processes = []
def get_connection():
import taos
return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
def get_max_sql_length(conn):
rows = conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
# ANCHOR: DataBaseMonitor # ANCHOR: DataBaseMonitor
class DataBaseMonitor: class DataBaseMonitor:
""" """
...@@ -36,10 +23,15 @@ class DataBaseMonitor: ...@@ -36,10 +23,15 @@ class DataBaseMonitor:
Prepare database and stable. Prepare database and stable.
Statistic writing speed and print it every 10 seconds. Statistic writing speed and print it every 10 seconds.
""" """
def __init__(self): def __init__(self):
self.process = Process(target=self.run) self.process = Process(target=self.run)
self.process.start() self.process.start()
def get_connection(self):
import taos
return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
def prepare_database(self, conn): def prepare_database(self, conn):
conn.execute("DROP DATABASE IF EXISTS test") conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test") conn.execute("CREATE DATABASE test")
...@@ -52,7 +44,7 @@ class DataBaseMonitor: ...@@ -52,7 +44,7 @@ class DataBaseMonitor:
def run(self): def run(self):
log = logging.getLogger("DataBaseMonitor") log = logging.getLogger("DataBaseMonitor")
conn = get_connection() conn = self.get_connection()
self.prepare_database(conn) self.prepare_database(conn)
last_count = 0 last_count = 0
while True: while True:
...@@ -67,6 +59,7 @@ class DataBaseMonitor: ...@@ -67,6 +59,7 @@ class DataBaseMonitor:
def stop(self): def stop(self):
self.process.terminate() self.process.terminate()
# ANCHOR_END: DataBaseMonitor # ANCHOR_END: DataBaseMonitor
# ANCHOR: MockDataSource # ANCHOR: MockDataSource
...@@ -104,90 +97,6 @@ class MockDataSource: ...@@ -104,90 +97,6 @@ class MockDataSource:
# ANCHOR_END: MockDataSource # ANCHOR_END: MockDataSource
# ANCHOR: SQLWriter
class SQLWriter:
log = logging.getLogger("SQLWriter")
def __init__(self):
self._buffered_count = 0
self._tb_values = {}
self._tb_tags = {}
self._conn = get_connection()
self._max_sql_lenght = get_max_sql_length(self._conn)
self._conn.execute("USE test")
def process_line(self, line: str):
"""
:param line: tbName,ts,current,voltage,phase,location,groupId
"""
self._buffered_count += 1
ps = line.split(",")
table_name = ps[0]
value = '(' + ",".join(ps[1:-2]) + ') '
if table_name in self._tb_values:
self._tb_values[table_name] += value
else:
self._tb_values[table_name] = value
if table_name not in self._tb_tags:
location = ps[-2]
group_id = ps[-1]
tag_value = f"('{location}',{group_id})"
self._tb_tags[table_name] = tag_value
if self._buffered_count == MAX_BATCH_SIZE:
self.flush()
def flush(self):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values
if sql_len + len(q) >= self._max_sql_lenght:
sql += " ".join(buf)
self.execute_sql(sql)
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.execute_sql(sql)
self._tb_values.clear()
self._buffered_count = 0
def execute_sql(self, sql):
import taos
try:
self._conn.execute(sql)
except taos.Error as e:
error_code = e.errno & 0xffff
# Table does not exit
if error_code == 0x362 or error_code == 0x218:
self.create_tables()
else:
raise e
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
self._conn.execute(sql)
@property
def buffered_count(self):
return self._buffered_count
# ANCHOR_END: SQLWriter
# ANCHOR: read # ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]): def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
...@@ -204,8 +113,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]): ...@@ -204,8 +113,9 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
# ANCHOR: write # ANCHOR: write
def run_write_task(task_id: int, queue: Queue): def run_write_task(task_id: int, queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}") log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter() writer = SQLWriter(MAX_BATCH_SIZE)
try: try:
while True: while True:
try: try:
...@@ -230,16 +140,16 @@ def set_global_config(): ...@@ -230,16 +140,16 @@ def set_global_config():
argc = len(sys.argv) argc = len(sys.argv)
if argc > 1: if argc > 1:
global READ_TASK_COUNT global READ_TASK_COUNT
READ_TASK_COUNT = sys.argv[1] READ_TASK_COUNT = int(sys.argv[1])
if argc > 2: if argc > 2:
global WRITE_TASK_COUNT global WRITE_TASK_COUNT
WRITE_TASK_COUNT = sys.argv[2] WRITE_TASK_COUNT = int(sys.argv[2])
if argc > 3: if argc > 3:
global TABLE_COUNT global TABLE_COUNT
TABLE_COUNT = sys.argv[3] TABLE_COUNT = int(sys.argv[3])
if argc > 4: if argc > 4:
global MAX_BATCH_SIZE global MAX_BATCH_SIZE
MAX_BATCH_SIZE = sys.argv[4] MAX_BATCH_SIZE = int(sys.argv[4])
# ANCHOR: main # ANCHOR: main
...@@ -248,7 +158,7 @@ def main(): ...@@ -248,7 +158,7 @@ def main():
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
database_monitor = DataBaseMonitor() database_monitor = DataBaseMonitor()
time.sleep(3) # wait for database ready time.sleep(3) # wait for database ready
task_queues: List[Queue] = [] task_queues: List[Queue] = []
...@@ -274,7 +184,6 @@ def main(): ...@@ -274,7 +184,6 @@ def main():
[p.terminate() for p in write_processes] [p.terminate() for p in write_processes]
# ANCHOR_END: main
if __name__ == '__main__': if __name__ == '__main__':
main() main()
# ANCHOR_END: main
import logging
import taos
class SQLWriter:
log = logging.getLogger("SQLWriter")
def __init__(self, max_batch_size):
self._buffered_count = 0
self._max_batch_size = max_batch_size
self._tb_values = {}
self._tb_tags = {}
self._conn = self.get_connection()
self._max_sql_lenght = self.get_max_sql_length()
self._conn.execute("USE test")
def get_max_sql_length(self):
rows = self._conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
def get_connection(self):
return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
def process_line(self, line: str):
"""
:param line: tbName,ts,current,voltage,phase,location,groupId
"""
self._buffered_count += 1
ps = line.split(",")
table_name = ps[0]
value = '(' + ",".join(ps[1:-2]) + ') '
if table_name in self._tb_values:
self._tb_values[table_name] += value
else:
self._tb_values[table_name] = value
if table_name not in self._tb_tags:
location = ps[-2]
group_id = ps[-1]
tag_value = f"('{location}',{group_id})"
self._tb_tags[table_name] = tag_value
if self._buffered_count == self._max_batch_size:
self.flush()
def flush(self):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values
if sql_len + len(q) >= self._max_sql_lenght:
sql += " ".join(buf)
self.execute_sql(sql)
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.execute_sql(sql)
self._tb_values.clear()
self._buffered_count = 0
def execute_sql(self, sql):
try:
self._conn.execute(sql)
except taos.Error as e:
error_code = e.errno & 0xffff
# Table does not exit
if error_code == 0x362 or error_code == 0x218:
self.create_tables()
else:
raise e
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
self._conn.execute(sql)
@property
def buffered_count(self):
return self._buffered_count
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册