未验证 提交 889019b2 编写于 作者: S sunpeng 提交者: GitHub

build: add python demo to ci (#19699)

* build: add python demo to ci

* build: fix python demo
上级 ac9cdf6c
module goexample
go 1.17
require github.com/taosdata/driver-go/v3 3.0
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taos://root:taosdata@localhost:6030/power")
df = pandas.read_sql("SELECT * FROM meters", engine)
conn = engine.connect()
df = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
import pandas
from sqlalchemy import create_engine
from sqlalchemy import create_engine, text
engine = create_engine("taosrest://root:taosdata@localhost:6041")
df: pandas.DataFrame = pandas.read_sql("SELECT * FROM power.meters", engine)
conn = engine.connect()
df: pandas.DataFrame = pandas.read_sql(text("SELECT * FROM power.meters"), conn)
conn.close()
# print index
print(df.index)
......
# ANCHOR: connect
from taosrest import connect, TaosRestConnection, TaosRestCursor
conn: TaosRestConnection = connect(url="http://localhost:6041",
conn = connect(url="http://localhost:6041",
user="root",
password="taosdata",
timeout=30)
......@@ -9,16 +9,17 @@ conn: TaosRestConnection = connect(url="http://localhost:6041",
# ANCHOR_END: connect
# ANCHOR: basic
# create STable
cursor: TaosRestCursor = conn.cursor()
cursor = conn.cursor()
cursor.execute("DROP DATABASE IF EXISTS power")
cursor.execute("CREATE DATABASE power")
cursor.execute("CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
cursor.execute(
"CREATE STABLE power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
# insert data
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS(California.SanFrancisco, 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS(California.SanFrancisco, 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS(California.LosAngeles, 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS(California.LosAngeles, 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
cursor.execute("""INSERT INTO power.d1001 USING power.meters TAGS('California.SanFrancisco', 2) VALUES ('2018-10-03 14:38:05.000', 10.30000, 219, 0.31000) ('2018-10-03 14:38:15.000', 12.60000, 218, 0.33000) ('2018-10-03 14:38:16.800', 12.30000, 221, 0.31000)
power.d1002 USING power.meters TAGS('California.SanFrancisco', 3) VALUES ('2018-10-03 14:38:16.650', 10.30000, 218, 0.25000)
power.d1003 USING power.meters TAGS('California.LosAngeles', 2) VALUES ('2018-10-03 14:38:05.500', 11.80000, 221, 0.28000) ('2018-10-03 14:38:16.600', 13.40000, 223, 0.29000)
power.d1004 USING power.meters TAGS('California.LosAngeles', 3) VALUES ('2018-10-03 14:38:05.000', 10.80000, 223, 0.29000) ('2018-10-03 14:38:06.500', 11.50000, 221, 0.35000)""")
print("inserted row count:", cursor.rowcount)
# query data
......@@ -28,7 +29,7 @@ print("queried row count:", cursor.rowcount)
# get column names from cursor
column_names = [meta[0] for meta in cursor.description]
# get rows
data: list[tuple] = cursor.fetchall()
data = cursor.fetchall()
print(column_names)
for row in data:
print(row)
......
......@@ -8,7 +8,7 @@ conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
affected_row: int = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m 24.4)")
affected_row = conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
print("affected_row", affected_row)
# output:
# affected_row 3
......@@ -16,10 +16,10 @@ print("affected_row", affected_row)
# ANCHOR: query
# Execute a sql and get its result set. It's useful for SELECT statement
result: taos.TaosResult = conn.query("SELECT * from weather")
result = conn.query("SELECT * from weather")
# Get fields from result
fields: taos.field.TaosFields = result.fields
fields = result.fields
for field in fields:
print(field) # {name: ts, type: 9, bytes: 8}
......
# install dependencies:
# recommend python >= 3.8
# pip3 install faster-fifo
#
import logging
import math
import multiprocessing
import sys
import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from multiprocessing import Process, Queue
from mockdatasource import MockDataSource
from queue import Empty
from typing import List
......@@ -22,8 +21,7 @@ TABLE_COUNT = 1000
QUEUE_SIZE = 1000000
MAX_BATCH_SIZE = 3000
read_processes = []
write_processes = []
_DONE_MESSAGE = '__DONE__'
def get_connection():
......@@ -44,41 +42,64 @@ def get_connection():
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
def run_read_task(task_id: int, task_queues: List[Queue], infinity):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
data_source = MockDataSource(f"tb{task_id}", table_count_per_task, infinity)
try:
for batch in data_source:
if isinstance(batch, tuple):
batch = [batch]
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
for row in rows:
task_queues[i].put(row)
if not infinity:
for queue in task_queues:
queue.put(_DONE_MESSAGE)
except KeyboardInterrupt:
pass
finally:
logging.info('read task over')
# ANCHOR_END: read
# ANCHOR: write
def run_write_task(task_id: int, queue: Queue):
def run_write_task(task_id: int, queue: Queue, done_queue: Queue):
from sql_writer import SQLWriter
log = logging.getLogger(f"WriteTask-{task_id}")
writer = SQLWriter(get_connection)
lines = None
try:
while True:
over = False
lines = []
for _ in range(MAX_BATCH_SIZE):
try:
# get as many as possible
lines = queue.get_many(block=False, max_messages_to_get=MAX_BATCH_SIZE)
writer.process_lines(lines)
line = queue.get_nowait()
if line == _DONE_MESSAGE:
over = True
break
if line:
lines.append(line)
except Empty:
time.sleep(0.01)
time.sleep(0.1)
if len(lines) > 0:
writer.process_lines(lines)
if over:
done_queue.put(_DONE_MESSAGE)
break
except KeyboardInterrupt:
pass
except BaseException as e:
log.debug(f"lines={lines}")
raise e
finally:
writer.close()
log.debug('write task over')
# ANCHOR_END: write
......@@ -103,13 +124,11 @@ def set_global_config():
# ANCHOR: monitor
def run_monitor_process():
def run_monitor_process(done_queue: Queue):
log = logging.getLogger("DataBaseMonitor")
conn = None
try:
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
def get_count():
res = conn.query("SELECT count(*) FROM test.meters")
......@@ -118,32 +137,51 @@ def run_monitor_process():
last_count = 0
while True:
try:
done = done_queue.get_nowait()
if done == _DONE_MESSAGE:
break
except Empty:
pass
time.sleep(10)
count = get_count()
log.info(f"count={count} speed={(count - last_count) / 10}")
last_count = count
finally:
conn.close()
# ANCHOR_END: monitor
# ANCHOR: main
def main():
def main(infinity):
set_global_config()
logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, "
f"TABLE_COUNT={TABLE_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
monitor_process = Process(target=run_monitor_process)
conn = get_connection()
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE IF NOT EXISTS test")
conn.execute("CREATE STABLE IF NOT EXISTS test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) "
"TAGS (location BINARY(64), groupId INT)")
conn.close()
done_queue = Queue()
monitor_process = Process(target=run_monitor_process, args=(done_queue,))
monitor_process.start()
time.sleep(3) # waiting for database ready.
logging.debug(f"monitor task started with pid {monitor_process.pid}")
task_queues: List[Queue] = []
write_processes = []
read_processes = []
# create task queues
for i in range(WRITE_TASK_COUNT):
queue = Queue(max_size_bytes=QUEUE_SIZE)
queue = Queue()
task_queues.append(queue)
# create write processes
for i in range(WRITE_TASK_COUNT):
p = Process(target=run_write_task, args=(i, task_queues[i]))
p = Process(target=run_write_task, args=(i, task_queues[i], done_queue))
p.start()
logging.debug(f"WriteTask-{i} started with pid {p.pid}")
write_processes.append(p)
......@@ -151,13 +189,19 @@ def main():
# create read processes
for i in range(READ_TASK_COUNT):
queues = assign_queues(i, task_queues)
p = Process(target=run_read_task, args=(i, queues))
p = Process(target=run_read_task, args=(i, queues, infinity))
p.start()
logging.debug(f"ReadTask-{i} started with pid {p.pid}")
read_processes.append(p)
try:
monitor_process.join()
for p in read_processes:
p.join()
for p in write_processes:
p.join()
time.sleep(1)
return
except KeyboardInterrupt:
monitor_process.terminate()
[p.terminate() for p in read_processes]
......@@ -176,5 +220,6 @@ def assign_queues(read_task_id, task_queues):
if __name__ == '__main__':
main()
multiprocessing.set_start_method('spawn')
main(False)
# ANCHOR_END: main
......@@ -26,7 +26,8 @@ class Consumer(object):
'bath_consume': True,
'batch_size': 1000,
'async_model': True,
'workers': 10
'workers': 10,
'testing': False
}
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
......@@ -46,6 +47,7 @@ class Consumer(object):
def __init__(self, **configs):
self.config: dict = self.DEFAULT_CONFIGS
self.config.update(configs)
if not self.config.get('testing'):
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'), # topic
bootstrap_servers=self.config.get('kafka_brokers'),
......@@ -60,7 +62,7 @@ class Consumer(object):
)
if self.config.get('async_model'):
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks: list[Future] = []
self.tasks = []
# tags and table mapping # key: {location}_{groupId} value:
self.tag_table_mapping = {}
i = 0
......@@ -115,14 +117,14 @@ class Consumer(object):
if self.taos is not None:
self.taos.close()
def _run(self, f: Callable[[ConsumerRecord], bool]):
def _run(self, f):
for message in self.consumer:
if self.config.get('async_model'):
self.pool.submit(f(message))
else:
f(message)
def _run_batch(self, f: Callable[[list[list[ConsumerRecord]]], None]):
def _run_batch(self, f):
while True:
messages = self.consumer.poll(timeout_ms=500, max_records=self.config.get('batch_size'))
if messages:
......@@ -140,7 +142,7 @@ class Consumer(object):
logging.info('## insert sql %s', sql)
return self.taos.execute(sql=sql) == 1
def _to_taos_batch(self, messages: list[list[ConsumerRecord]]):
def _to_taos_batch(self, messages):
sql = self._build_sql_batch(messages=messages)
if len(sql) == 0: # decode error, skip
return
......@@ -162,7 +164,7 @@ class Consumer(object):
table_name = self._get_table_name(location=location, group_id=group_id)
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_batch(self, messages: list[list[ConsumerRecord]]) -> str:
def _build_sql_batch(self, messages) -> str:
sql_list = []
for partition_messages in messages:
for message in partition_messages:
......@@ -186,7 +188,55 @@ def _get_location_and_group(key: str) -> (str, int):
return fields[0], fields[1]
def test_to_taos(consumer: Consumer):
msg = {
'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027,
}
record = ConsumerRecord(checksum=None, headers=None, offset=1, key=None, value=json.dumps(msg), partition=1,
topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None)
assert consumer._to_taos(message=record)
def test_to_taos_batch(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.SanFrancisco',
'groupId': 1,
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'location': 'California.LosAngles',
'groupId': 2,
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._to_taos_batch(messages=records)
if __name__ == '__main__':
consumer = Consumer(async_model=True)
consumer = Consumer(async_model=True, testing=True)
# init env
consumer.init_env()
consumer.consume()
\ No newline at end of file
# consumer.consume()
# test build sql
# test build sql batch
test_to_taos(consumer)
test_to_taos_batch(consumer)
\ No newline at end of file
......@@ -10,13 +10,14 @@ class MockDataSource:
"9.4,118,0.141,California.SanFrancisco,4"
]
def __init__(self, tb_name_prefix, table_count):
def __init__(self, tb_name_prefix, table_count, infinity=True):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
self.infinity = infinity
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
......@@ -28,6 +29,9 @@ class MockDataSource:
def __iter__(self):
self.row = 0
if not self.infinity:
return iter(self._iter_data())
else:
return self
def __next__(self):
......@@ -35,7 +39,9 @@ class MockDataSource:
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
# generate 1000 timestamps
return self._iter_data()
def _iter_data(self):
ts = []
for _ in range(1000):
self.current_ts += 100
......@@ -47,3 +53,10 @@ class MockDataSource:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
if __name__ == '__main__':
datasource = MockDataSource('t', 10, False)
for data in datasource:
print(data)
\ No newline at end of file
......@@ -10,6 +10,7 @@ class SQLWriter:
self._tb_tags = {}
self._conn = get_connection_func()
self._max_sql_length = self.get_max_sql_length()
self._conn.execute("create database if not exists test")
self._conn.execute("USE test")
def get_max_sql_length(self):
......@@ -20,7 +21,7 @@ class SQLWriter:
return int(r[1])
return 1024 * 1024
def process_lines(self, lines: str):
def process_lines(self, lines: [str]):
"""
:param lines: [[tbName,ts,current,voltage,phase,location,groupId]]
"""
......@@ -60,6 +61,7 @@ class SQLWriter:
buf.append(q)
sql_len += len(q)
sql += " ".join(buf)
self.create_tables()
self.execute_sql(sql)
self._tb_values.clear()
......@@ -88,3 +90,23 @@ class SQLWriter:
except BaseException as e:
self.log.error("Execute SQL: %s", sql)
raise e
def close(self):
if self._conn:
self._conn.close()
if __name__ == '__main__':
def get_connection_func():
conn = taos.connect()
return conn
writer = SQLWriter(get_connection_func=get_connection_func)
writer.execute_sql(
"create stable if not exists meters (ts timestamp, current float, voltage int, phase float) "
"tags (location binary(64), groupId int)")
writer.execute_sql(
"INSERT INTO d21001 USING meters TAGS ('California.SanFrancisco', 2) "
"VALUES ('2021-07-13 14:06:32.272', 10.2, 219, 0.32)")
\ No newline at end of file
from taos.tmq import Consumer
import taos
from taos.tmq import *
conn = taos.connect()
print("init")
conn.execute("drop topic if exists topic_ctb_column")
conn.execute("drop database if exists py_tmq")
conn.execute("create database if not exists py_tmq vgroups 2")
conn.select_db("py_tmq")
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
)
conn.execute("create table if not exists tb1 using stb1 tags(1)")
conn.execute("create table if not exists tb2 using stb1 tags(2)")
conn.execute("create table if not exists tb3 using stb1 tags(3)")
print("create topic")
conn.execute(
"create topic if not exists topic_ctb_column as select ts, c1, c2, c3 from stb1"
)
print("build consumer")
conf = TaosTmqConf()
conf.set("group.id", "tg2")
conf.set("td.connect.user", "root")
conf.set("td.connect.pass", "taosdata")
conf.set("enable.auto.commit", "true")
def tmq_commit_cb_print(tmq, resp, offset, param=None):
print(f"commit: {resp}, tmq: {tmq}, offset: {offset}, param: {param}")
conf.set_auto_commit_cb(tmq_commit_cb_print, None)
tmq = conf.new_consumer()
print("build topic list")
topic_list = TaosTmqList()
topic_list.append("topic_ctb_column")
print("basic consume loop")
tmq.subscribe(topic_list)
sub_list = tmq.subscription()
print("subscribed topics: ", sub_list)
while 1:
res = tmq.poll(1000)
if res:
topic = res.get_topic_name()
vg = res.get_vgroup_id()
db = res.get_db_name()
print(f"topic: {topic}\nvgroup id: {vg}\ndb: {db}")
for row in res:
print(row)
def init_tmq_env(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
conn.execute("create database if not exists {}".format(db))
conn.select_db(db)
conn.execute(
"create stable if not exists stb1 (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))")
conn.execute("create table if not exists tb1 using stb1 tags(1, 't1')")
conn.execute("create table if not exists tb2 using stb1 tags(2, 't2')")
conn.execute("create table if not exists tb3 using stb1 tags(3, 't3')")
conn.execute("create topic if not exists {} as select ts, c1, c2, c3 from stb1".format(topic))
conn.execute("insert into tb1 values (now, 1, 1.0, 'tmq test')")
conn.execute("insert into tb2 values (now, 2, 2.0, 'tmq test')")
conn.execute("insert into tb3 values (now, 3, 3.0, 'tmq test')")
def cleanup(db, topic):
conn = taos.connect()
conn.execute("drop topic if exists {}".format(topic))
conn.execute("drop database if exists {}".format(db))
if __name__ == '__main__':
init_tmq_env("tmq_test", "tmq_test_topic") # init env
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
consumer.subscribe(["tmq_test_topic"])
try:
while True:
res = consumer.poll(1)
if not res:
break
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
cleanup("tmq_test", "tmq_test_topic")
\ No newline at end of file
......@@ -44,4 +44,43 @@ taos -s "drop database test"
python3 json_protocol_example.py
# 10
# python3 subscribe_demo.py
pip install SQLAlchemy
pip install pandas
taosBenchmark -y -d power -t 10 -n 10
python3 conn_native_pandas.py
python3 conn_rest_pandas.py
taos -s "drop database if exists power"
# 11
taos -s "create database if not exists test"
python3 connect_native_reference.py
# 12
python3 connect_rest_examples.py
# 13
python3 handle_exception.py
# 14
taosBenchmark -y -d power -t 2 -n 10
python3 rest_client_example.py
taos -s "drop database if exists power"
# 15
python3 result_set_examples.py
# 16
python3 tmq_example.py
# 17
python3 sql_writer.py
# 18
python3 mockdatasource.py
# 19
python3 fast_write_example.py
# 20
pip3 install kafka-python
python3 kafka_example.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册