diff --git a/docs/examples/python/highvolume_faster_queue.py b/docs/examples/python/highvolume_faster_queue.py index ccdde705a450b2461a79c189c15dcba54365ce60..a08de041774894e28c6ccd878c2aceee2b2dbe51 100644 --- a/docs/examples/python/highvolume_faster_queue.py +++ b/docs/examples/python/highvolume_faster_queue.py @@ -47,11 +47,11 @@ def run_read_task(task_id: int, task_queues: List[Queue]): data_source = MockDataSource(f"tb{task_id}", table_count_per_task) try: for batch in data_source: - for table_id, row in batch: + for table_id, rows in batch: # hash data to different queue i = table_id % len(task_queues) # block putting forever when the queue is full - task_queues[i].put(row, block=True, timeout=-1) + task_queues[i].put_many(rows, block=True, timeout=-1) except KeyboardInterrupt: pass diff --git a/docs/examples/python/mockdatasoruce.py b/docs/examples/python/mockdatasoruce.py index c77dc36801afb633a16c4b83b1c76b34e77a36b4..6172fda7b399d9a6c570360baa568115698f0611 100644 --- a/docs/examples/python/mockdatasoruce.py +++ b/docs/examples/python/mockdatasoruce.py @@ -14,7 +14,8 @@ class MockDataSource: self.table_name_prefix = tb_name_prefix + "_" self.table_count = table_count self.max_rows = 10000000 - self.start_ms = round(time.time() * 1000) - self.max_rows * 100 + self.current_ts = round(time.time() * 1000) - self.max_rows * 100 + # [(tableId, tableName, values),] self.data = self._init_data() def _init_data(self): @@ -31,15 +32,21 @@ class MockDataSource: def __next__(self): """ - next row for each table. - [(tableId, row),(tableId, row)] + next 1000 rows for each table. + return: {tableId:[row,...]} """ - self.row += 1 - ts = self.start_ms + 100 * self.row - - # just add timestamp to each row - # (tableId, "tableName,ts,current,voltage,phase,location,groupId") - return map(lambda t: (t[0], t[1] + str(ts) + "," + t[2]), self.data) + # generate 1000 timestamps + ts = [] + for _ in range(1000): + self.current_ts += 100 + ts.append(self.current_ts) + # add timestamp to each row + # [(tableId, ["tableName,ts,current,voltage,phase,location,groupId"])] + result = [] + for table_id, table_name, values in self.data: + rows = [table_name + ',' + t + ',' + values for t in ts] + result.append((table_id, rows)) + return result if __name__ == '__main__':