提交 812a40f9 编写于 作者: D dingbo

docs: mockdatasoruce.py

上级 b2037f7a
......@@ -47,11 +47,11 @@ def run_read_task(task_id: int, task_queues: List[Queue]):
data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
try:
for batch in data_source:
for table_id, row in batch:
for table_id, rows in batch:
# hash data to different queue
i = table_id % len(task_queues)
# block putting forever when the queue is full
task_queues[i].put(row, block=True, timeout=-1)
task_queues[i].put_many(rows, block=True, timeout=-1)
except KeyboardInterrupt:
pass
......
......@@ -14,7 +14,8 @@ class MockDataSource:
self.table_name_prefix = tb_name_prefix + "_"
self.table_count = table_count
self.max_rows = 10000000
self.start_ms = round(time.time() * 1000) - self.max_rows * 100
self.current_ts = round(time.time() * 1000) - self.max_rows * 100
# [(tableId, tableName, values),]
self.data = self._init_data()
def _init_data(self):
......@@ -31,15 +32,21 @@ class MockDataSource:
def __next__(self):
"""
next row for each table.
[(tableId, row),(tableId, row)]
next 1000 rows for each table.
return: {tableId:[row,...]}
"""
self.row += 1
ts = self.start_ms + 100 * self.row
# just add timestamp to each row
# (tableId, "tableName,ts,current,voltage,phase,location,groupId")
return map(lambda t: (t[0], t[1] + str(ts) + "," + t[2]), self.data)
# generate 1000 timestamps
ts = []
for _ in range(1000):
self.current_ts += 100
ts.append(self.current_ts)
# add timestamp to each row
# [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])]
result = []
for table_id, table_name, values in self.data:
rows = [table_name + ',' + t + ',' + values for t in ts]
result.append((table_id, rows))
return result
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册