--- sidebar_label: Python title: TDengine Python Connector description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。tasopy 对 TDengine 的原生接口和 REST 接口都进行了封装, 分别对应 tasopy 的两个子模块:taos 和 taosrest。除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范(PEP 249)的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas" --- import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; `taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。 `taos-ws-py` 是使用 WebSocket 方式连接 TDengine 的 Python 连接器包。可以选装。 使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。 Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-python)。 ## 支持的平台 - 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。 - REST 连接支持所有能运行 Python 的平台。 ## 版本选择 无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。 ## 支持的功能 - 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。 - REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。 ## 安装 ### 准备 1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。 2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。 3. 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。 ### 使用 pip 安装 #### 卸载旧版本 如果以前安装过旧版本的 Python 连接器, 请提前卸载。 ``` pip3 uninstall taos taospy ``` :::note 较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 `taos`。 所以上述卸载命令包含了 `taos`, 不存在也没关系。 ::: #### 安装 `taospy` 安装最新版本 ``` pip3 install taospy ``` 也可以指定某个特定版本安装。 ``` pip3 install taospy==2.3.0 ``` ``` pip3 install git+https://github.com/taosdata/taos-connector-python.git ``` #### 安装 `taos-ws-py`(可选) taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。 ##### 和 taospy 同时安装 ```bash pip3 install taospy[ws] ``` ##### 单独安装 ```bash pip3 install taos-ws-py ``` ### 安装验证 对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入: ```python import taos ``` 对于 REST 连接,只需验证是否能成功导入 `taosrest` 模块。可在 Python 交互式 Shell 中输入: ```python import taosrest ``` 对于 WebSocket 连接,只需验证是否能成功导入 `taosws` 模块。可在 Python 交互式 Shell 中输入: ```python import taosws ``` :::tip 如果系统上有多个版本的 Python,则可能有多个 `pip` 命令。要确保使用的 `pip` 命令路径是正确的。上面我们用 `pip3` 命令安装,排除了使用 Python 2.x 版本对应的 `pip` 的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 `pip3 install taospy`, 就会打印出 `taospy` 的具体安装位置,比如在 Windows 上: ``` C:\> pip3 install taospy Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0) ``` ::: ## 建立连接 ### 连通性测试 在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。 请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 `ping` 命令进行测试: ``` ping ``` 然后测试用 TDengine CLI 能否正常连接集群: ``` taos -h -p ``` 上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN, PORT 为这个 dnode 对应的 serverPort。 对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试: ``` curl -u root:taosdata http://:/rest/sql -d "select server_version()" ``` 上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如: ```json { "code": 0, "column_meta": [ [ "server_version()", "VARCHAR", 7 ] ], "data": [ [ "3.0.0.0" ] ], "rows": 1 } ``` 对于 WebSocket 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试: ``` curl -i -N -d "show databases" -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Host: :" -H "Origin: http://:" http://:/rest/sql ``` 上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。 如果测试成功,会输出服务器版本信息,比如: ```json HTTP/1.1 200 OK Content-Type: application/json; charset=utf-8 Date: Tue, 21 Mar 2023 09:29:17 GMT Transfer-Encoding: chunked {"status":"succ","head":["server_version()"],"column_meta":[["server_version()",8,8]],"data":[["2.6.0.27"]],"rows":1} ``` ### 使用连接器建立连接 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 ```python {{#include docs/examples/python/connect_native_reference.py}} ``` `connect` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明: - `host` : 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。 - `user` :TDengine 用户名。 默认值是 root。 - `password` : TDengine 用户密码。 默认值是 taosdata。 - `port` : 要连接的数据节点的起始端口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。 - `config` : 客户端配置文件路径。 在 Windows 系统上默认是 `C:\TDengine\cfg`。 在 Linux/macOS 系统上默认是 `/etc/taos/`。 - `timezone` : 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。 :::warning `config` 和 `timezone` 都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。 ::: :::tip `connect` 函数返回 `taos.TaosConnection` 实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。 ::: ```python {{#include docs/examples/python/connect_rest_examples.py:connect}} ``` `connect()` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明: - `url`: taosAdapter REST 服务的 URL。默认是 。 - `user`: TDengine 用户名。默认是 root。 - `password`: TDengine 用户密码。默认是 taosdata。 - `timeout`: HTTP 请求超时时间。单位为秒。默认为 `socket._GLOBAL_DEFAULT_TIMEOUT`。 一般无需配置。 ```python {{#include docs/examples/python/connect_websocket_examples.py:connect}} ``` `connect()` 函数参数为连接 url,协议为 `taosws` 或 `ws` ## 示例程序 ### 基本使用 ##### TaosConnection 类的使用 `TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法。 ```python title="execute 方法" {{#include docs/examples/python/connection_usage_native_reference.py:insert}} ``` ```python title="query 方法" {{#include docs/examples/python/connection_usage_native_reference.py:query}} ``` :::tip 查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。 ::: ##### TaosResult 类的使用 上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。 ```python title="blocks_iter 方法" {{#include docs/examples/python/result_set_examples.py}} ``` ##### TaosCursor 类的使用 `TaosConnection` 类和 `TaosResult` 类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 `TaosCursor` 类提供的方法。 ```python title="TaosCursor 的使用" {{#include docs/examples/python/cursor_usage_native_reference.py}} ``` :::note TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。 ::: ##### TaosRestCursor 类的使用 `TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。 ```python title="TaosRestCursor 的使用" {{#include docs/examples/python/connect_rest_examples.py:basic}} ``` - `cursor.execute` : 用来执行任意 SQL 语句。 - `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。 - `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。 ##### RestClient 类的使用 `RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 ```python title="RestClient 的使用" {{#include docs/examples/python/rest_client_example.py}} ``` 对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 ```python {{#include docs/examples/python/connect_websocket_examples.py:basic}} ``` - `conn.execute`: 用来执行任意 SQL 语句,返回影响的行数 - `conn.query`: 用来执行查询 SQL 语句,返回查询结果 ### 与 req_id 一起使用 使用可选的 req_id 参数,指定请求 id,可以用于 tracing ##### TaosConnection 类的使用 类似上文介绍的使用方法,增加 `req_id` 参数。 ```python title="execute 方法" {{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}} ``` ```python title="query 方法" {{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}} ``` ##### TaosResult 类的使用 类似上文介绍的使用方法,增加 `req_id` 参数。 ```python title="blocks_iter 方法" {{#include docs/examples/python/result_set_with_req_id_examples.py}} ``` ##### TaosCursor 类的使用 `TaosConnection` 类和 `TaosResult` 类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 `TaosCursor` 类提供的方法。 ```python title="TaosCursor 的使用" {{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}} ``` 类似上文介绍的使用方法,增加 `req_id` 参数。 ##### TaosRestCursor 类的使用 `TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。 ```python title="TaosRestCursor 的使用" {{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}} ``` - `cursor.execute` : 用来执行任意 SQL 语句。 - `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。 - `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。 ##### RestClient 类的使用 `RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。 ```python title="RestClient 的使用" {{#include docs/examples/python/rest_client_with_req_id_example.py}} ``` 对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 类似上文介绍的使用方法,增加 `req_id` 参数。 ```python {{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}} ``` - `conn.execute`: 用来执行任意 SQL 语句,返回影响的行数 - `conn.query`: 用来执行查询 SQL 语句,返回查询结果 ### 与 pandas 一起使用 ```python {{#include docs/examples/python/conn_native_pandas.py}} ``` ```python {{#include docs/examples/python/conn_rest_pandas.py}} ``` ```python {{#include docs/examples/python/conn_websocket_pandas.py}} ``` ### 数据订阅 连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 `Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 #### 创建 Consumer 创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 ```python from taos.tmq import Consumer consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` #### 订阅 topics Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 ```python consumer.subscribe(['topic1', 'topic2']) ``` #### 消费数据 Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 ```python while True: res = consumer.poll(1) if not res: continue err = res.error() if err is not None: raise err val = res.value() for block in val: print(block.fetchall()) ``` #### 获取消费进度 Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 ```python assignments = consumer.assignment() ``` #### 重置消费进度 Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 ```python tp = TopicPartition(topic='topic1', partition=0, offset=0) consumer.seek(tp) ``` #### 结束消费 消费结束后,应当取消订阅,并关闭 Consumer。 ```python consumer.unsubscribe() consumer.close() ``` #### tmq 订阅示例代码 ```python {{#include docs/examples/python/tmq_example.py}} ``` #### 获取和重置消费进度示例代码 ```python {{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} ``` 除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。 #### 创建 Consumer 创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 ```python import taosws consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) ``` #### 订阅 topics Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 ```python consumer.subscribe(['topic1', 'topic2']) ``` #### 消费数据 Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 ```python while True: res = consumer.poll(timeout=1.0) if not res: continue err = res.error() if err is not None: raise err for block in message: for row in block: print(row) ``` #### 获取消费进度 Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 ```python assignments = consumer.assignment() ``` #### 重置消费进度 Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 ```python consumer.seek(topic='topic1', partition=0, offset=0) ``` #### 结束消费 消费结束后,应当取消订阅,并关闭 Consumer。 ```python consumer.unsubscribe() consumer.close() ``` #### tmq 订阅示例代码 ```python {{#include docs/examples/python/tmq_websocket_example.py}} ``` 连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 #### 获取和重置消费进度示例代码 ```python {{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} ``` ### 无模式写入 连接器支持无模式写入功能。 简单写入 ```python {{#include docs/examples/python/schemaless_insert.py}} ``` 带有 ttl 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_ttl.py}} ``` 带有 req_id 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_req_id.py}} ``` 简单写入 ```python {{#include docs/examples/python/schemaless_insert_raw.py}} ``` 带有 ttl 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_raw_ttl.py}} ``` 带有 req_id 参数的写入 ```python {{#include docs/examples/python/schemaless_insert_raw_req_id.py}} ``` ### 其它示例程序 | 示例程序链接 | 示例程序内容 | | ------------------------------------------------------------------------------------------------------------- | ----------------------- | | [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | 参数绑定, 一次绑定多行 | | [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | 参数绑定,一次绑定一行 | | [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB 行协议写入 | | [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | 使用 JSON 类型的标签 | | [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py) | tmq 订阅 | ## 其它说明 ### 异常处理 所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如: ```python {{#include docs/examples/python/handle_exception.py}} ``` `` ### 关于纳秒 (nanosecond) 由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。 1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds 2. https://www.python.org/dev/peps/pep-0564/ ## 重要更新 [**Release Notes**](https://github.com/taosdata/taos-connector-python/releases) ## API 参考 - [taos](https://docs.taosdata.com/api/taospy/taos/) - [taosrest](https://docs.taosdata.com/api/taospy/taosrest) ## 常见问题 欢迎[提问或报告问题](https://github.com/taosdata/taos-connector-python/issues)。