Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3581dc3d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3581dc3d
编写于
7月 15, 2022
作者:
D
dingbo
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
docs: delete highvolume_mp_queue.py
上级
1180912b
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
0 addition
and
193 deletion
+0
-193
docs/examples/python/highvolume_mp_queue.py
docs/examples/python/highvolume_mp_queue.py
+0
-193
未找到文件。
docs/examples/python/highvolume_mp_queue.py
已删除
100644 → 0
浏览文件 @
1180912b
# import logging
# import sys
# import time
# from multiprocessing import Queue, Process
# from queue import Empty
# from typing import List
#
# logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format="%(asctime)s [%(name)s] - %(message)s")
#
# READ_TASK_COUNT = 1
# WRITE_TASK_COUNT = 1
# QUEUE_SIZE = 1000
# TABLE_COUNT = 1000
# MAX_BATCH_SIZE = 3000
#
# read_processes = []
# write_processes = []
#
#
# # ANCHOR: DataBaseMonitor
# class DataBaseMonitor:
# """
# Start a thread.
# Prepare database and stable.
# Statistic writing speed and print it every 10 seconds.
# """
#
# def __init__(self):
# self.process = Process(target=self.run)
# self.process.start()
#
# def get_connection(self):
# import taos
# return taos.connect(host="localhost", user="root", password="taosdata", port=6030)
#
# def prepare_database(self, conn):
# conn.execute("DROP DATABASE IF EXISTS test")
# conn.execute("CREATE DATABASE test")
# conn.execute("CREATE STABLE test.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)")
#
# def get_count(self, conn):
# res = conn.query("SELECT count(*) FROM test.meters")
# rows = res.fetch_all()
# return rows[0][0] if rows else 0
#
# def run(self):
# log = logging.getLogger("DataBaseMonitor")
# conn = self.get_connection()
# self.prepare_database(conn)
# last_count = 0
# while True:
# time.sleep(10)
# count = self.get_count(conn)
# log.info(f"count={count} speed={(count - last_count) / 10}")
# last_count = count
#
# def join(self):
# self.process.join()
#
# def stop(self):
# self.process.terminate()
#
#
# # ANCHOR_END: DataBaseMonitor
#
# # ANCHOR: MockDataSource
# class MockDataSource:
# location = ["LosAngeles", "SanDiego", "Hollywood", "Compton", "San Francisco"]
# current = [8.8, 10.7, 9.9, 8.9, 9.4]
# voltage = [119, 116, 111, 113, 118]
# phase = [0.32, 0.34, 0.33, 0.329, 0.141]
# max_rows_per_table = 10 ** 9
#
# def __init__(self, tb_name_prefix, table_count):
# self.table_name_prefix = tb_name_prefix
# self.table_count = table_count
# self.start_ms = round(time.time() * 1000) - self.max_rows_per_table * 100
#
# def __iter__(self):
# self.row = 0
# self.table_id = -1
# return self
#
# def __next__(self):
# self.table_id += 1
# if self.table_id == self.table_count:
# self.table_id = 0
# self.row += 1
# if self.row < self.max_rows_per_table:
# ts = self.start_ms + 100 * self.row
# group_id = self.table_id % 5 if self.table_id % 5 == 0 else self.table_id % 5 + 1
# tb_name = self.table_name_prefix + '_' + str(self.table_id)
# ri = self.row % 5
# return self.table_id, f"{tb_name},{ts},{self.current[ri]},{self.voltage[ri]},{self.phase[ri]},{self.location[ri]},{group_id}"
# else:
# raise StopIteration
#
#
# # ANCHOR_END: MockDataSource
#
# # ANCHOR: read
# def run_read_task(task_id: int, task_queues: List[Queue]):
# table_count_per_task = TABLE_COUNT // READ_TASK_COUNT
# data_source = MockDataSource(f"tb{task_id}", table_count_per_task)
# try:
# for table_id, line in data_source:
# i = table_id % len(task_queues)
# task_queues[i].put(line, block=True)
# except KeyboardInterrupt:
# pass
#
#
# # ANCHOR_END: read
#
# # ANCHOR: write
# def run_write_task(task_id: int, queue: Queue):
# from sql_writer import SQLWriter
# log = logging.getLogger(f"WriteTask-{task_id}")
# writer = SQLWriter(MAX_BATCH_SIZE)
# try:
# while True:
# try:
# line = queue.get(block=False)
# writer.process_line(line)
# except Empty:
# if writer.buffered_count > 0:
# writer.flush()
# else:
# time.sleep(0.01)
# except KeyboardInterrupt:
# pass
# except BaseException as e:
# msg = f"line={line}, buffer_count={writer.buffered_count}"
# log.debug(msg)
# raise e
#
#
# # ANCHOR_END: write
#
# def set_global_config():
# argc = len(sys.argv)
# if argc > 1:
# global READ_TASK_COUNT
# READ_TASK_COUNT = int(sys.argv[1])
# if argc > 2:
# global WRITE_TASK_COUNT
# WRITE_TASK_COUNT = int(sys.argv[2])
# if argc > 3:
# global QUEUE_SIZE
# QUEUE_SIZE = int(sys.argv[3])
# if argc > 4:
# global TABLE_COUNT
# TABLE_COUNT = int(sys.argv[4])
# if argc > 5:
# global MAX_BATCH_SIZE
# MAX_BATCH_SIZE = int(sys.argv[5])
#
#
# # ANCHOR: main
# def main():
# set_global_config()
# logging.info(f"READ_TASK_COUNT={READ_TASK_COUNT}, WRITE_TASK_COUNT={WRITE_TASK_COUNT}, QUEUE_SIZE={QUEUE_SIZE}, TABLE_COUNT={TABLE_COUNT}, MAX_BATCH_SIZE={MAX_BATCH_SIZE}")
#
# database_monitor = DataBaseMonitor()
# time.sleep(3) # wait for database ready
#
# task_queues: List[Queue] = []
#
# for i in range(WRITE_TASK_COUNT):
# queue = Queue(maxsize=QUEUE_SIZE)
# task_queues.append(queue)
# p = Process(target=run_write_task, args=(i, queue))
# p.start()
# logging.debug(f"WriteTask-{i} started with pid {p.pid}")
# write_processes.append(p)
#
# for i in range(READ_TASK_COUNT):
# p = Process(target=run_read_task, args=(i, task_queues))
# p.start()
# logging.debug(f"ReadTask-{i} started with pid {p.pid}")
# read_processes.append(p)
#
# try:
# database_monitor.join()
# except KeyboardInterrupt:
# database_monitor.stop()
# [p.terminate() for p in read_processes]
# [p.terminate() for p in write_processes]
#
#
# if __name__ == '__main__':
# main()
# # ANCHOR_END: main
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录