提交 ca9a65d7 编写于 作者: D dingbo

docs: mockdatasoruce.py

上级 9fac17d6
......@@ -9,6 +9,7 @@ import time
import os
from multiprocessing import Process
from faster_fifo import Queue
from mockdatasoruce import MockDataSource
from queue import Empty
from typing import List
......@@ -40,57 +41,17 @@ def get_connection():
return taos.connect(host=host, port=int(port), user=user, password=password)
# ANCHOR: MockDataSource
class MockDataSource:
location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
current = [8.8, 10.7, 9.9, 8.9, 9.4]
voltage = [119, 116, 111, 113, 118]
phase = [0.32, 0.34, 0.33, 0.329, 0.141]
max_rows_per_table = 10 ** 9
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix
self.table_count = table_count
self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
def __iter__(self):
self.row = 0
self.table_id = -1
return self
def __next__(self):
"""
next 100 rows of current table
"""
self.table_id += 1
if self.table_id == self.table_count:
self.table_id = 0
if self.row >= self.max_rows_per_table:
raise StopIteration
rows = []
while len(rows) < 100:
self.row += 1
ts = self.start_ms + 100 * self.row
group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
tb_name = self.table_name_prefix + '_' + str(self.table_id)
ri = self.row % 5
rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}")
return self.table_id, rows
# ANCHOR_END: MockDataSource
# ANCHOR: read
def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for table_id, rows in data_source:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put_many(rows, block=True, timeout=-1)
for batch in data_source:
for table_id, row in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put(row, block=True, timeout=-1)
except KeyboardInterrupt:
pass
......
import time
class MockDataSource:
samples = [
"LosAngeles,0,8.8,119,0.32",
"SanDiego,1,10.7,116,0.34",
"Hollywood,2,9.9,111,0.33",
"Compton,3,8.9,113,0.329",
"San Francisco,4,9.4,118,0.141"
]
def __init__(self, tb_name_prefix, table_count):
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.start_ms = round(time.time() * 1000) - self.max_rows * 100
self.data = self._init_data()
def _init_data(self):
lines = self.samples * (self.table_count // 5 + 1)
data = []
for i in range(self.table_count):
table_name = self.table_name_prefix + str(i)
# tbName,location,groupId,current,voltage,phase
row = table_name + ',' + lines[i]
data.append((i, row)) # tableId, row
return data
def __iter__(self):
self.row = 0
return self
def __next__(self):
"""
next row for each table.
[(tableId, row),(tableId, row)]
"""
self.row += 1
ts = self.start_ms + 100 * self.row
# just add timestamp to each row
return map(lambda t: (t[0], str(ts) + "," + t[1]), self.data)
if __name__ == '__main__':
"""
Test performance of MockDataSource
"""
from threading import Thread
count = 0
def consume():
global count
for data in MockDataSource("1", 1000):
count += len(data)
Thread(target=consume).start()
while True:
time.sleep(1)
print(count)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册