提交 21bcfbce 编写于 作者: D dingbo

docs: test

上级 658ba5c6
...@@ -182,14 +182,16 @@ class SQLWriter: ...@@ -182,14 +182,16 @@ class SQLWriter:
def run_read_task(task_id: int, task_queues: List[Queue]): def run_read_task(task_id: int, task_queues: List[Queue]):
table_count_per_task = TABLE_COUNT // READ_TASK_COUNT table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
data_source = MockDataSource(f"tb{task_id}", table_count_per_task) data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
for table_id, line in data_source: try:
i = table_id % len(task_queues) for table_id, line in data_source:
task_queues[i].put(line, block=True) i = table_id % len(task_queues)
task_queues[i].put(line, block=True)
except KeyboardInterrupt:
pass
# ANCHOR_END: read # ANCHOR_END: read
# ANCHOR: write # ANCHOR: write
def run_write_task(task_id: int, queue: Queue): def run_write_task(task_id: int, queue: Queue):
log = logging.getLogger(f"WriteTask-{task_id}") log = logging.getLogger(f"WriteTask-{task_id}")
...@@ -204,6 +206,8 @@ def run_write_task(task_id: int, queue: Queue): ...@@ -204,6 +206,8 @@ def run_write_task(task_id: int, queue: Queue):
writer.flush() writer.flush()
else: else:
time.sleep(0.01) time.sleep(0.01)
except KeyboardInterrupt:
pass
except BaseException as e: except BaseException as e:
msg = f"line={line}, buffer_count={writer.buffered_count}" msg = f"line={line}, buffer_count={writer.buffered_count}"
log.debug(msg) log.debug(msg)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册