From ca9a65d712e9e7319d229da8061908fbf1708f64 Mon Sep 17 00:00:00 2001 From: dingbo Date: Thu, 14 Jul 2022 14:47:40 +0800 Subject: [PATCH] docs: mockdatasoruce.py --- .../python/highvolume_faster_queue.py | 53 +++------------- docs/examples/python/mockdatasoruce.py | 63 +++++++++++++++++++ 2 files changed, 70 insertions(+), 46 deletions(-) create mode 100644 docs/examples/python/mockdatasoruce.py diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py index 14aebc67ee..ebd4115e21 100644 --- a/docs/examples/python/highvolume_faster_queue.py +++ b/docs/examples/python/highvolume_faster_queue.py @@ -9,6 +9,7 @@ import time import os from multiprocessing import Process from faster_fifo import Queue +from mockdatasoruce import MockDataSource from queue import Empty from typing import List @@ -40,57 +41,17 @@ def get_connection(): return taos.connect(host=host, port=int(port), user=user, password=password) -# ANCHOR: MockDataSource -class MockDataSource: - location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"] - current = [8.8, 10.7, 9.9, 8.9, 9.4] - voltage = [119, 116, 111, 113, 118] - phase = [0.32, 0.34, 0.33, 0.329, 0.141] - max_rows_per_table = 10 ** 9 - - def __init__(self, tb_name_prefix, table_count): - self.table_name_prefix = tb_name_prefix - self.table_count = table_count - self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100 - - def __iter__(self): - self.row = 0 - self.table_id = -1 - return self - - def __next__(self): - """ - next 100 rows of current table - """ - self.table_id += 1 - if self.table_id == self.table_count: - self.table_id = 0 - if self.row >= self.max_rows_per_table: - raise StopIteration - rows = [] - - while len(rows) < 100: - self.row += 1 - ts = self.start_ms + 100 * self.row - group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1 - tb_name = self.table_name_prefix + '_' + str(self.table_id) - ri = self.row % 5 - rows.append(f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}") - return self.table_id, rows - - -# ANCHOR_END: MockDataSource - # ANCHOR: read def run_read_task(task_id: int, task_queues: List[Queue]): table_count_per_task = TABLE_COUNT // READ_TASK_COUNT data_source = MockDataSource(f"tb{task_id}", table_count_per_task) try: - for table_id, rows in data_source: - # hash data to different queue - i = table_id % len(task_queues) - # block putting forever when the queue is full - task_queues[i].put_many(rows, block=True, timeout=-1) + for batch in data_source: + for table_id, row in batch: + # hash data to different queue + i = table_id % len(task_queues) + # block putting forever when the queue is full + task_queues[i].put(row, block=True, timeout=-1) except KeyboardInterrupt: pass diff --git a/docs/examples/python/mockdatasoruce.py b/docs/examples/python/mockdatasoruce.py new file mode 100644 index 0000000000..cf4d8db98a --- /dev/null +++ b/docs/examples/python/mockdatasoruce.py @@ -0,0 +1,63 @@ +import time + + +class MockDataSource: + samples = [ + "LosAngeles,0,8.8,119,0.32", + "SanDiego,1,10.7,116,0.34", + "Hollywood,2,9.9,111,0.33", + "Compton,3,8.9,113,0.329", + "San Francisco,4,9.4,118,0.141" + ] + + def __init__(self, tb_name_prefix, table_count): + self.table_name_prefix = tb_name_prefix + "_" + self.table_count = table_count + self.max_rows = 10000000 + self.start_ms = round(time.time() * 1000) - self.max_rows * 100 + self.data = self._init_data() + + def _init_data(self): + lines = self.samples * (self.table_count // 5 + 1) + data = [] + for i in range(self.table_count): + table_name = self.table_name_prefix + str(i) + # tbName,location,groupId,current,voltage,phase + row = table_name + ',' + lines[i] + data.append((i, row)) # tableId, row + return data + + def __iter__(self): + self.row = 0 + return self + + def __next__(self): + """ + next row for each table. + [(tableId, row),(tableId, row)] + """ + self.row += 1 + ts = self.start_ms + 100 * self.row + # just add timestamp to each row + return map(lambda t: (t[0], str(ts) + "," + t[1]), self.data) + + +if __name__ == '__main__': + """ + Test performance of MockDataSource + """ + from threading import Thread + + count = 0 + + + def consume(): + global count + for data in MockDataSource("1", 1000): + count += len(data) + + + Thread(target=consume).start() + while True: + time.sleep(1) + print(count) -- GitLab