提交 3bbb38c9 编写于 作者: B Bo Ding

docs: test hihvolume_example.py

上级 2fc88d32
import logging import logging
import sys import sys
from threading import Thread
import time import time
from multiprocessing import Queue, Process from multiprocessing import Queue, Process
from queue import Empty from queue import Empty
from typing import List from typing import List
import taos
from taos import TaosConnection
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s") logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
READ_TASK_COUNT = 1 READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 3 WRITE_TASK_COUNT = 1
TABLE_COUNT = 1000 TABLE_COUNT = 1000
MAX_BATCH_SIZE = 3000 MAX_BATCH_SIZE = 3000
MAX_SQL_LENGTH = 1024 * 1024
read_processes = [] read_processes = []
write_processes = [] write_processes = []
def get_connection(): def get_connection():
import taos
return taos.connect(host="localhost", user="root", password="taosdata", port=6030) return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
def get_max_sql_length(conn):
rows = conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
# ANCHOR: DataBaseMonitor # ANCHOR: DataBaseMonitor
class DataBaseMonitor: class DataBaseMonitor:
"""
Start a thread.
Prepare database and stable.
Statistic writing speed and print it every 10 seconds.
"""
def __init__(self): def __init__(self):
self.conn: TaosConnection = get_connection() self.process = Process(target=self.run)
self.process.start()
def prepare_database(self): def prepare_database(self, conn):
self.conn.execute("DROP DATABASE IF EXISTS test") conn.execute("DROP DATABASE IF EXISTS test")
self.conn.execute("CREATE DATABASE test") conn.execute("CREATE DATABASE test")
self.conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)") conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
def get_count(self): def get_count(self, conn):
res = self.conn.query("SELECT count(*) FROM test.meters") res = conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all() rows = res.fetch_all()
return rows[0][0] if rows else 0 return rows[0][0] if rows else 0
def get_max_sql_length(self): def run(self):
rows = self.conn.query("SHOW variables").fetch_all() log = logging.getLogger("DataBaseMonitor")
for r in rows: conn = get_connection()
name = r[0] self.prepare_database(conn)
if name == "maxSQLLength":
return int(r[1])
def stat_and_print(self):
last_count = 0 last_count = 0
while True: while True:
time.sleep(10) time.sleep(10)
count = self.get_count() count = self.get_count(conn)
logging.info(f"count={count} speed={(count - last_count) / 10}") log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count last_count = count
def join(self):
self.process.join()
def stop(self):
self.process.terminate()
# ANCHOR_END: DataBaseMonitor # ANCHOR_END: DataBaseMonitor
...@@ -101,6 +113,7 @@ class SQLWriter: ...@@ -101,6 +113,7 @@ class SQLWriter:
self._tb_values = {} self._tb_values = {}
self._tb_tags = {} self._tb_tags = {}
self._conn = get_connection() self._conn = get_connection()
self._max_sql_lenght = get_max_sql_length(self._conn)
self._conn.execute("USE test") self._conn.execute("USE test")
def process_line(self, line: str): def process_line(self, line: str):
...@@ -136,7 +149,7 @@ class SQLWriter: ...@@ -136,7 +149,7 @@ class SQLWriter:
buf = [] buf = []
for tb_name, values in self._tb_values.items(): for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values q = tb_name + " VALUES " + values
if sql_len + len(q) >= MAX_SQL_LENGTH: if sql_len + len(q) >= self._max_sql_lenght:
sql += " ".join(buf) sql += " ".join(buf)
self.execute_sql(sql) self.execute_sql(sql)
sql = "INSERT INTO " sql = "INSERT INTO "
...@@ -150,7 +163,7 @@ class SQLWriter: ...@@ -150,7 +163,7 @@ class SQLWriter:
self._buffered_count = 0 self._buffered_count = 0
def execute_sql(self, sql): def execute_sql(self, sql):
self.log.debug(sql) import taos
try: try:
self._conn.execute(sql) self._conn.execute(sql)
except taos.Error as e: except taos.Error as e:
...@@ -235,10 +248,7 @@ def main(): ...@@ -235,10 +248,7 @@ def main():
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}") logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
database_monitor = DataBaseMonitor() database_monitor = DataBaseMonitor()
database_monitor.prepare_database() time.sleep(3) # wait for database ready
global MAX_SQL_LENGTH
MAX_SQL_LENGTH = database_monitor.get_max_sql_length()
logging.info(f"MAX_SQL_LENGTH={MAX_SQL_LENGTH}")
task_queues: List[Queue] = [] task_queues: List[Queue] = []
...@@ -247,20 +257,21 @@ def main(): ...@@ -247,20 +257,21 @@ def main():
task_queues.append(queue) task_queues.append(queue)
p = Process(target=run_write_task, args=(i, queue)) p = Process(target=run_write_task, args=(i, queue))
p.start() p.start()
logging.debug(f"WriteTask {i} started with pid {p.pid}")
write_processes.append(p) write_processes.append(p)
for i in range(READ_TASK_COUNT): for i in range(READ_TASK_COUNT):
p = Process(target=run_read_task, args=(i, task_queues)) p = Process(target=run_read_task, args=(i, task_queues))
p.start() p.start()
logging.debug(f"ReadTask {i} started with pid {p.pid}")
read_processes.append(p) read_processes.append(p)
try: try:
database_monitor.stat_and_print() database_monitor.join()
except KeyboardInterrupt: except KeyboardInterrupt:
database_monitor.stop()
[p.terminate() for p in read_processes] [p.terminate() for p in read_processes]
[p.terminate() for p in write_processes] [p.terminate() for p in write_processes]
exit()
# ANCHOR_END: main # ANCHOR_END: main
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册