提交 c810b6b1 编写于 作者: D dingbo

docs: python example

上级 9ffd6678
import sys
import logging import logging
import sys
import threading
import time import time
from multiprocessing import Queue, Process
from queue import Empty
import taos
from taos import TaosConnection
from typing import List
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(threadName)s] - %(message)s") READ_TASK_COUNT = 1
WRITE_TASK_COUNT = 3
TABLE_COUNT = 1000
MAX_BATCH_SIZE = 3000
MAX_SQL_LENGTH = 1024 * 1024
read_processes = []
write_processes = []
def get_connection():
return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
class DataBaseMonitor: class DataBaseMonitor:
def init(self): def __init__(self):
pass self.conn: TaosConnection = get_connection()
def prepare_database(self): def prepare_database(self):
pass self.conn.execute("DROP DATABASE IF EXISTS test")
self.conn.execute("CREATE DATABASE test")
self.conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
def get_count(self): def get_count(self):
return 1 res = self.conn.query("SELECT count(*) FROM test.meters")
rows = res.fetch_all()
return rows[0][0] if rows else 0
def get_max_sql_length(self):
rows = self.conn.query("SHOW variables").fetch_all()
for r in rows:
name = r[0]
if name == "maxSQLLength":
return int(r[1])
def stat_and_print(self): def stat_and_print(self):
last_count = 0 try:
while True: last_count = 0
time.sleep(10000) while True:
count = self.get_count() time.sleep(10)
logging.info(f"count={count} speed={(count - last_count) / 10}") count = self.get_count()
last_count = count logging.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
except KeyboardInterrupt:
[p.kill() for p in read_processes]
[p.kill for p in write_processes]
class ReadTask: class MockDataSource:
def __init__(self): location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
self.log = logging.getLogger("ReadTask") current = [8.8, 10.7, 9.9, 8.9, 9.4]
self.log.info("test") voltage = [119, 116, 111, 113, 118]
phase = [0.32, 0.34, 0.33, 0.329, 0.141]
max_rows_per_table = 10 ** 9
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix
self.table_count = table_count
self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
def __iter__(self):
self.row = 0
self.table_id = -1
return self
def __next__(self):
self.table_id += 1
if self.table_id == self.table_count:
self.table_id = 0
self.row += 1
if self.row < self.max_rows_per_table:
ts = self.start_ms + 100 * self.row
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri = self.row % 5
return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}"
else:
raise StopIteration
class MockDataSource: class SQLWriter:
log = logging.getLogger("SQLWriter")
def __init__(self): def __init__(self):
self._buffered_count = 0
self._tb_values = {}
self._tb_tags = {}
self._conn = get_connection()
self._conn.execute("USE test")
def process_line(self, line: str):
"""
:param line: tbName,ts,current,voltage,phase,location,groupId
"""
self._buffered_count += 1
ps = line.split(",")
table_name = ps[0]
value = '(' + ",".join(ps[1:-2]) + ') '
if table_name in self._tb_values:
self._tb_values[table_name] += value
else:
self._tb_values[table_name] = value
if table_name not in self._tb_tags:
location = ps[-2]
group_id = ps[-1]
tag_value = f"('{location}',{group_id})"
self._tb_tags[table_name] = tag_value
if self._buffered_count == MAX_BATCH_SIZE:
self.flush()
def flush(self):
"""
Assemble INSERT statement and execute it.
When the sql length grows close to MAX_SQL_LENGTH, the sql will be executed immediately, and a new INSERT statement will be created.
In case of "Table does not exit" exception, tables in the sql will be created and the sql will be re-executed.
"""
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
for tb_name, values in self._tb_values.items():
q = tb_name + " VALUES " + values
if sql_len + len(q) >= MAX_SQL_LENGTH:
sql += " ".join(buf)
self.execute_sql(sql)
sql = "INSERT INTO "
sql_len = len(sql)
buf = []
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.execute_sql(sql)
self._tb_values.clear()
self._buffered_count = 0
def execute_sql(self, sql):
try:
self._conn.execute(sql)
except taos.Error as e:
error_code = e.errno & 0xffff
# Table does not exit
if error_code == 0x362 or error_code == 0x218:
self.create_tables()
else:
raise e
def create_tables(self):
sql = "CREATE TABLE "
for tb in self._tb_values.keys():
tag_values = self._tb_tags[tb]
sql += "IF NOT EXISTS " + tb + " USING meters TAGS " + tag_values + " "
self._conn.execute(sql)
@property
def buffered_count(self):
return self._buffered_count
def run_read_task(task_id: int, task_queues: List[Queue]):
log = logging.getLogger(f"ReadTask-{task_id}")
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for table_id, line in data_source:
i = table_id % len(task_queues)
task_queues[i].put(line, block=True)
except KeyboardInterrupt:
pass pass
class WriteTask: def run_write_task(task_id: int, queue: Queue):
def __init__(self): log = logging.getLogger(f"WriteTask-{task_id}")
self.log = logging.getLogger("WriteTask") writer = SQLWriter()
self.log.info("test") try:
while True:
try:
line = queue.get(block=False)
writer.process_line(line)
except Empty:
if writer.buffered_count > 0:
writer.flush()
else:
time.sleep(0.01)
except KeyboardInterrupt:
pass
except BaseException as e:
msg = f"line={line}, buffer_count={writer.buffered_count}"
log.debug(msg)
raise e
class SQLWriter: def set_global_config():
def __init__(self): argc = len(sys.argv)
pass if argc > 1:
global READ_TASK_COUNT
READ_TASK_COUNT = sys.argv[1]
if argc > 2:
global WRITE_TASK_COUNT
WRITE_TASK_COUNT = sys.argv[2]
if argc > 3:
global TABLE_COUNT
TABLE_COUNT = sys.argv[3]
if argc > 4:
global MAX_BATCH_SIZE
MAX_BATCH_SIZE = sys.argv[4]
def main(): def main():
argc = len(sys.argv) set_global_config()
read_task_count = sys.argv[1] if argc > 1 else 1 logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
write_task_count = sys.argv[2] if argc > 2 else 3
table_count = sys.argv[3] if argc > 3 else 1000
max_batch_size = sys.argv[4] if argc > 4 else 3000
logging.info(f"read_task_count={read_task_count}, write_task_count={write_task_count}, table_count={table_count}, max_batch_size={max_batch_size}")
database_monitor = DataBaseMonitor() database_monitor = DataBaseMonitor()
database_monitor.prepare_database() database_monitor.prepare_database()
global MAX_SQL_LENGTH
MAX_SQL_LENGTH = database_monitor.get_max_sql_length()
logging.info(f"MAX_SQL_LENGTH={MAX_SQL_LENGTH}")
for i in range(write_task_count): task_queues: List[Queue] = []
pass
for i in range(read_task_count): for i in range(WRITE_TASK_COUNT):
pass queue = Queue(maxsize=10000000)
task_queues.append(queue)
p = Process(target=run_write_task, args=(i, queue))
p.start()
write_processes.append(p)
for i in range(READ_TASK_COUNT):
p = Process(target=run_read_task, args=(i, task_queues))
p.start()
read_processes.append(p)
database_monitor.stat_and_print() database_monitor.stat_and_print()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册