30-python.mdx 32.3 KB
Newer Older
1
---
G
gccgdb1234 已提交
2
toc_max_heading_level: 4
3 4
sidebar_label: Python
title: TDengine Python Connector
5
description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。tasopy 对 TDengine 的原生接口和 REST 接口都进行了封装, 分别对应 tasopy 的两个子模块:taos 和 taosrest。除了对原生接口和 REST 接口的封装,taospy 还提供了符合 Python 数据访问规范(PEP 249)的编程接口。这使得 taospy 和很多第三方工具集成变得简单,比如 SQLAlchemy 和 pandas"
6 7 8 9 10
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";

G
gccgdb1234 已提交
11
`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。
12 13
除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。

14 15 16
`taos-ws-py` 是使用 WebSocket 方式连接 TDengine 的 Python 连接器包。可以选装。

使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口或 WebSocket 接口与服务端建立的连接的方式下文中称为“REST 连接”或“WebSocket 连接”。
17 18 19 20 21

Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-connector-python)。

## 支持的平台

G
gccgdb1234 已提交
22
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
23 24
- REST 连接支持所有能运行 Python 的平台。

S
sunpeng 已提交
25 26 27 28 29 30
### 支持的功能

- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。

## 历史版本
31 32 33

无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。

34 35 36 37 38 39 40 41 42 43
|Python Connector 版本|主要变化|
|:-------------------:|:----:|
|2.7.9|数据订阅支持获取消费进度和重置消费进度|
|2.7.8|新增 `execute_many`|

|Python Websocket Connector 版本|主要变化|
|:----------------------------:|:-----:|
|0.2.5|1. 数据订阅支持获取消费进度和重置消费进度 <br/> 2. 支持 schemaless <br/> 3. 支持 STMT|
|0.2.4|数据订阅新增取消订阅方法|

S
sunpeng 已提交
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
## 处理异常

Python 连接器可能会产生 4 种异常:

- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine 其他功能模块的异常

|Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:|
|InterfaceError|taosc 版本太低,不支持所使用的接口|请检查 TDengine 客户端版本|
|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数|
|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版|
|OperationalError|操作错误|API 使用错误,请检查代码|
|ProgrammingError|||
|StatementError|stmt 相关异常||
|ResultError|||
|SchemalessError|schemaless 相关异常||
|TmqError|tmq 相关异常||

Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。

Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:

```python
{{#include docs/examples/python/handle_exception.py}}
```

S
sunpeng 已提交
74
## TDengine DataType 和 Python DataType
75

S
sunpeng 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:

|TDengine DataType|Python DataType|
|:---------------:|:-------------:|
|TIMESTAMP|datetime|
|INT|int|
|BIGINT|int|
|FLOAT|float|
|DOUBLE|int|
|SMALLINT|int|
|TINYINT|int|
|BOOL|bool|
|BINARY|str|
|NCHAR|str|
|JSON|str|
91

S
sunpeng 已提交
92
## 安装步骤
93

S
sunpeng 已提交
94
### 安装前准备
95

96
1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
97
2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
3. 如果使用原生连接,还需[安装客户端驱动](../#安装客户端驱动)。客户端软件包含了 TDengine 客户端动态链接库(libtaos.so 或 taos.dll) 和 TDengine CLI。

### 使用 pip 安装

#### 卸载旧版本

如果以前安装过旧版本的 Python 连接器, 请提前卸载。

```
pip3 uninstall taos taospy
```

:::note
较早的 TDengine 客户端软件包含了 Python 连接器。如果从客户端软件的安装目录安装了 Python 连接器,那么对应的 Python 包名是 `taos`。 所以上述卸载命令包含了 `taos`, 不存在也没关系。

:::

#### 安装 `taospy`

<Tabs>
<TabItem label="从 PyPI 安装" value="pypi">

安装最新版本

```
pip3 install taospy
```

也可以指定某个特定版本安装。

```
pip3 install taospy==2.3.0
```

</TabItem>
<TabItem label="从 GitHub 安装" value="github">

```
pip3 install git+https://github.com/taosdata/taos-connector-python.git
```

</TabItem>
</Tabs>

142
#### 安装 `taos-ws-py`(可选)
143

144
taos-ws-py 包提供了通过 WebSocket 连接 TDengine 的能力,可选安装 taos-ws-py 以获得 WebSocket 连接 TDengine 的能力。
145

146 147 148 149 150

##### 和 taospy 同时安装

```bash
pip3 install taospy[ws]
151
```
152 153 154 155

##### 单独安装

```bash
156 157 158
pip3 install taos-ws-py
```

159 160
### 安装验证

G
gccgdb1234 已提交
161
<Tabs defaultValue="rest">
162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178
<TabItem value="native" label="原生连接">

对于原生连接,需要验证客户端驱动和 Python 连接器本身是否都正确安装。如果能成功导入 `taos` 模块,则说明已经正确安装了客户端驱动和 Python 连接器。可在 Python 交互式 Shell 中输入:

```python
import taos
```

</TabItem>
<TabItem  value="rest" label="REST 连接">

对于 REST 连接,只需验证是否能成功导入 `taosrest` 模块。可在 Python 交互式 Shell 中输入:

```python
import taosrest
```

179 180 181 182 183 184 185 186 187
</TabItem>
<TabItem  value="ws" label="WebSocket 连接">

对于 WebSocket 连接,只需验证是否能成功导入 `taosws` 模块。可在 Python 交互式 Shell 中输入:

```python
import taosws
```

188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
</TabItem>
</Tabs>

:::tip
如果系统上有多个版本的 Python,则可能有多个 `pip` 命令。要确保使用的 `pip` 命令路径是正确的。上面我们用 `pip3` 命令安装,排除了使用 Python 2.x 版本对应的 `pip` 的可能性。但是如果系统上有多个 Python 3.x 版本,仍需检查安装路径是否正确。最简单的验证方式是,在命令再次输入 `pip3 install taospy`, 就会打印出 `taospy` 的具体安装位置,比如在 Windows 上:

```
C:\> pip3 install taospy
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Requirement already satisfied: taospy in c:\users\username\appdata\local\programs\python\python310\lib\site-packages (2.3.0)
```

:::

## 建立连接

### 连通性测试

在用连接器建立连接之前,建议先测试本地 TDengine CLI 到 TDengine 集群的连通性。

G
gccgdb1234 已提交
208
<Tabs defaultValue="rest">
209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238
<TabItem value="native" label="原生连接">

请确保 TDengine 集群已经启动, 且集群中机器的 FQDN (如果启动的是单机版,FQDN 默认为 hostname)在本机能够解析, 可用 `ping` 命令进行测试:

```
ping <FQDN>
```

然后测试用 TDengine CLI 能否正常连接集群:

```
taos -h <FQDN> -p <PORT>
```

上面的 FQDN 可以为集群中任意一个 dnode 的 FQDN, PORT 为这个 dnode 对应的 serverPort。

</TabItem>
<TabItem value="rest" label="REST 连接" groupId="connect">

对于 REST 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:

```
curl -u root:taosdata http://<FQDN>:<PORT>/rest/sql -d "select server_version()"
```

上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。
如果测试成功,会输出服务器版本信息,比如:

```json
{
239 240 241 242 243 244 245 246 247 248 249 250 251
  "code": 0,
  "column_meta": [
    [
      "server_version()",
      "VARCHAR",
      7
    ]
  ],
  "data": [
    [
      "3.0.0.0"
    ]
  ],
252 253 254 255
  "rows": 1
}
```

256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276
</TabItem>
<TabItem value="ws" label="WebSocket 连接" groupId="connect">

对于 WebSocket 连接, 除了确保集群已经启动,还要确保 taosAdapter 组件已经启动。可以使用如下 curl 命令测试:

```
curl -i -N -d "show databases" -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -H "Connection: Upgrade" -H "Upgrade: websocket" -H "Host: <FQDN>:<PORT>" -H "Origin: http://<FQDN>:<PORT>" http://<FQDN>:<PORT>/rest/sql
```

上面的 FQDN 为运行 taosAdapter 的机器的 FQDN, PORT 为 taosAdapter 配置的监听端口, 默认为 6041。
如果测试成功,会输出服务器版本信息,比如:

```json
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Tue, 21 Mar 2023 09:29:17 GMT
Transfer-Encoding: chunked

{"status":"succ","head":["server_version()"],"column_meta":[["server_version()",8,8]],"data":[["2.6.0.27"]],"rows":1}
```

277 278 279
</TabItem>
</Tabs>

S
sunpeng 已提交
280
### 指定 Host 和 Properties 获取连接
281 282 283

以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。

G
gccgdb1234 已提交
284
<Tabs defaultValue="rest">
285 286 287
<TabItem value="native" label="原生连接" groupId="connect">

```python
D
dingbo 已提交
288
{{#include docs/examples/python/connect_native_reference.py}}
289 290 291 292 293 294 295 296
```

`connect` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

- `host` : 要连接的节点的 FQDN。 没有默认值。如果不同提供此参数,则会连接客户端配置文件中的 firstEP。
- `user` :TDengine 用户名。 默认值是 root。
- `password` : TDengine 用户密码。 默认值是 taosdata。
- `port` : 要连接的数据节点的起始端口,即 serverPort 配置。默认值是 6030。只有在提供了 host 参数的时候,这个参数才生效。
wafwerar's avatar
wafwerar 已提交
297
- `config` : 客户端配置文件路径。 在 Windows 系统上默认是 `C:\TDengine\cfg`。 在 Linux/macOS 系统上默认是 `/etc/taos/`。
298 299 300 301 302 303 304 305 306 307 308 309 310 311 312
- `timezone` : 查询结果中 TIMESTAMP 类型的数据,转换为 python 的 datetime 对象时使用的时区。默认为本地时区。

:::warning
`config` 和 `timezone` 都是进程级别的配置。建议一个进程建立的所有连接都使用相同的参数值。否则可能产生无法预知的错误。
:::

:::tip
`connect` 函数返回 `taos.TaosConnection` 实例。 在客户端多线程的场景下,推荐每个线程申请一个独立的连接实例,而不建议多线程共享一个连接。

:::

</TabItem>
<TabItem value="rest" label="REST 连接">

```python
D
dingbo 已提交
313
{{#include docs/examples/python/connect_rest_examples.py:connect}}
314 315 316 317
```

`connect()` 函数的所有参数都是可选的关键字参数。下面是连接参数的具体说明:

D
dingbo 已提交
318
- `url`: taosAdapter REST 服务的 URL。默认是 <http://localhost:6041>。
319 320
- `user`: TDengine 用户名。默认是 root。
- `password`: TDengine 用户密码。默认是 taosdata。
321 322
- `timeout`: HTTP 请求超时时间。单位为秒。默认为 `socket._GLOBAL_DEFAULT_TIMEOUT`。 一般无需配置。

323 324 325 326 327 328 329 330 331 332
</TabItem>

<TabItem value="websocket" label="WebSocket 连接">

```python
{{#include docs/examples/python/connect_websocket_examples.py:connect}}
```

`connect()` 函数参数为连接 url,协议为 `taosws` 或 `ws`

333 334 335
</TabItem>
</Tabs>

S
sunpeng 已提交
336 337 338 339 340 341 342
### 配置参数的优先级

如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下:

1. 连接参数
2. 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg

S
sunpeng 已提交
343
## 使用示例
344

S
sunpeng 已提交
345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
### 创建数据库和表

<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">

```python
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```

</TabItem>

<TabItem value="rest" label="REST 连接">

```python
conn = taosrest.connect(url="http://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```

</TabItem>

<TabItem value="websocket" label="WebSocket 连接">

```python
378
conn = taosws.connect("taosws://localhost:6041")
S
sunpeng 已提交
379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```

</TabItem>
</Tabs>

### 插入数据

```python
conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
```

:::
now 为系统内部函数,默认为客户端所在计算机当前时间。 now + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
:::

399 400
### 基本使用

G
gccgdb1234 已提交
401
<Tabs defaultValue="rest">
402 403 404 405 406 407 408
<TabItem value="native" label="原生连接">

##### TaosConnection 类的使用

`TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法。

```python title="execute 方法"
D
dingbo 已提交
409
{{#include docs/examples/python/connection_usage_native_reference.py:insert}}
410 411 412
```

```python title="query 方法"
D
dingbo 已提交
413
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
414 415 416 417 418 419 420 421 422 423 424
```

:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::

##### TaosResult 类的使用

上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。

```python title="blocks_iter 方法"
D
dingbo 已提交
425
{{#include docs/examples/python/result_set_examples.py}}
426 427 428 429 430 431
```
##### TaosCursor 类的使用

`TaosConnection` 类和 `TaosResult` 类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 `TaosCursor` 类提供的方法。

```python title="TaosCursor 的使用"
D
dingbo 已提交
432
{{#include docs/examples/python/cursor_usage_native_reference.py}}
433 434 435 436 437 438 439 440 441 442 443 444 445 446
```

:::note
TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
:::

</TabItem>
<TabItem value="rest" label="REST 连接">

#####  TaosRestCursor 类的使用

`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。

```python title="TaosRestCursor 的使用"
D
dingbo 已提交
447
{{#include docs/examples/python/connect_rest_examples.py:basic}}
448 449 450 451 452 453 454
```
- `cursor.execute` : 用来执行任意 SQL 语句。
- `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
- `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。

##### RestClient 类的使用

G
gccgdb1234 已提交
455
`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。
456 457

```python title="RestClient 的使用"
D
dingbo 已提交
458
{{#include docs/examples/python/rest_client_example.py}}
459 460 461
```

对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
462 463
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
464

S
sunpeng 已提交
465
##### Connection 类的使用
S
sunpeng 已提交
466 467 468

`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。

469 470 471
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
472

473 474
- `conn.execute`: 用来执行任意 SQL 语句,返回影响的行数
- `conn.query`: 用来执行查询 SQL 语句,返回查询结果
475 476 477 478

</TabItem>
</Tabs>

S
sunpeng 已提交
479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518
### 查询数据

<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">

`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。

```python
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
```

:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::

</TabItem>

<TabItem value="rest" label="REST 连接">

RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。

```python
{{#include docs/examples/python/rest_client_example.py}}
```

对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。

</TabItem>

<TabItem value="websocket" label="WebSocket 连接">

`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。

```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```

</TabItem>
</Tabs>

S
sunpeng 已提交
519
### 执行带有 reqId 的 SQL
520 521 522 523 524 525 526 527

使用可选的 req_id 参数,指定请求 id,可以用于 tracing

<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">

##### TaosConnection 类的使用

528
类似上文介绍的使用方法,增加 `req_id` 参数。
529 530 531 532 533 534 535 536 537 538 539

```python title="execute 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
```

```python title="query 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```

##### TaosResult 类的使用

540
类似上文介绍的使用方法,增加 `req_id` 参数。
541 542 543 544 545 546 547 548 549 550 551 552 553 554 555

```python title="blocks_iter 方法"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
```
##### TaosCursor 类的使用

`TaosConnection` 类和 `TaosResult` 类已经实现了原生接口的所有功能。如果你对 PEP249 规范中的接口比较熟悉也可以使用 `TaosCursor` 类提供的方法。

```python title="TaosCursor 的使用"
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```

</TabItem>
<TabItem value="rest" label="REST 连接">

556 557
类似上文介绍的使用方法,增加 `req_id` 参数。

558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578
#####  TaosRestCursor 类的使用

`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。

```python title="TaosRestCursor 的使用"
{{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}}
```
- `cursor.execute` : 用来执行任意 SQL 语句。
- `cursor.rowcount`: 对于写入操作返回写入成功记录数。对于查询操作,返回结果集行数。
- `cursor.description` : 返回字段的描述信息。关于描述信息的具体格式请参考[TaosRestCursor](https://docs.taosdata.com/api/taospy/taosrest/cursor.html)。

##### RestClient 类的使用

`RestClient` 类是对于 [REST API](../rest-api) 的直接封装。它只包含一个 `sql()` 方法用于执行任意 SQL 语句, 并返回执行结果。

```python title="RestClient 的使用"
{{#include docs/examples/python/rest_client_with_req_id_example.py}}
```

对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
579

580 581
<TabItem value="websocket" label="WebSocket 连接">

582 583
类似上文介绍的使用方法,增加 `req_id` 参数。

584 585 586 587 588 589 590 591 592 593
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```

- `conn.execute`: 用来执行任意 SQL 语句,返回影响的行数
- `conn.query`: 用来执行查询 SQL 语句,返回查询结果

</TabItem>
</Tabs>

594 595
### 与 pandas 一起使用

G
gccgdb1234 已提交
596
<Tabs defaultValue="rest">
597 598 599
<TabItem value="native" label="原生连接">

```python
D
dingbo 已提交
600
{{#include docs/examples/python/conn_native_pandas.py}}
601 602 603 604 605 606
```

</TabItem>
<TabItem value="rest" label="REST 连接">

```python
D
dingbo 已提交
607
{{#include docs/examples/python/conn_rest_pandas.py}}
608 609
```

610 611 612 613 614 615 616
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">

```python
{{#include docs/examples/python/conn_websocket_pandas.py}}
```

617 618 619
</TabItem>
</Tabs>

S
sunpeng 已提交
620
### 通过参数绑定写入数据
621

S
sunpeng 已提交
622
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。
623

S
sunpeng 已提交
624
<Tabs>
625 626
<TabItem value="native" label="原生连接">

S
sunpeng 已提交
627
##### 创建 stmt
S
sunpeng 已提交
628

S
sunpeng 已提交
629
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
S
sunpeng 已提交
630 631

```
S
sunpeng 已提交
632
import taos
S
sunpeng 已提交
633

S
sunpeng 已提交
634 635
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
S
sunpeng 已提交
636 637
```

S
sunpeng 已提交
638
##### 参数绑定
S
sunpeng 已提交
639

S
sunpeng 已提交
640
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
S
sunpeng 已提交
641

S
sunpeng 已提交
642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None])  # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
S
sunpeng 已提交
660 661
```

S
sunpeng 已提交
662
调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。
S
sunpeng 已提交
663

S
sunpeng 已提交
664 665
```
stmt.bind_param_batch(params)
S
sunpeng 已提交
666 667
```

S
sunpeng 已提交
668
##### 执行 sql
S
sunpeng 已提交
669

S
sunpeng 已提交
670
调用 stmt 的 `execute` 方法执行 sql
S
sunpeng 已提交
671

S
sunpeng 已提交
672 673
```
stmt.execute()
S
sunpeng 已提交
674 675
```

S
sunpeng 已提交
676
##### 关闭 stmt
S
sunpeng 已提交
677

S
sunpeng 已提交
678
最后需要关闭 stmt。
S
sunpeng 已提交
679 680

```
S
sunpeng 已提交
681
stmt.close()
682
```
683

S
sunpeng 已提交
684
##### 示例代码
S
sunpeng 已提交
685 686

```python
S
sunpeng 已提交
687
{{#include docs/examples/python/stmt_example.py}}
S
sunpeng 已提交
688
```
689 690
</TabItem>

691
<TabItem value="websocket" label="WebSocket 连接">
692

S
sunpeng 已提交
693
##### 创建 stmt
S
sunpeng 已提交
694

S
sunpeng 已提交
695
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
S
sunpeng 已提交
696

S
sunpeng 已提交
697
```
S
sunpeng 已提交
698 699
import taosws

S
sunpeng 已提交
700 701
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
S
sunpeng 已提交
702 703
```

S
sunpeng 已提交
704
##### 解析 sql
S
sunpeng 已提交
705

S
sunpeng 已提交
706
调用 stmt 的 `prepare` 方法来解析 insert 语句。
S
sunpeng 已提交
707

S
sunpeng 已提交
708 709
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
S
sunpeng 已提交
710 711
```

S
sunpeng 已提交
712
##### 参数绑定
S
sunpeng 已提交
713

S
sunpeng 已提交
714
调用 stmt 的 `bind_param` 方法绑定参数。
S
sunpeng 已提交
715

S
sunpeng 已提交
716 717 718 719 720 721 722
```
stmt.bind_param([
    taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
    taosws.ints_to_column([1, 2, 3, 4]),
    taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
    taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
S
sunpeng 已提交
723 724
```

S
sunpeng 已提交
725
调用 stmt 的 `add_batch` 方法,将参数加入批处理。
S
sunpeng 已提交
726

S
sunpeng 已提交
727 728
```
stmt.add_batch()
S
sunpeng 已提交
729 730
```

S
sunpeng 已提交
731
##### 执行 sql
S
sunpeng 已提交
732

S
sunpeng 已提交
733
调用 stmt 的 `execute` 方法执行 sql
S
sunpeng 已提交
734

S
sunpeng 已提交
735 736
```
stmt.execute()
S
sunpeng 已提交
737 738
```

S
sunpeng 已提交
739
##### 关闭 stmt
S
sunpeng 已提交
740

S
sunpeng 已提交
741
最后需要关闭 stmt。
S
sunpeng 已提交
742 743

```
S
sunpeng 已提交
744
stmt.close()
745
```
746

S
sunpeng 已提交
747
##### 示例代码
S
sunpeng 已提交
748 749

```python
S
sunpeng 已提交
750
{{#include docs/examples/python/stmt_websocket_example.py}}
S
sunpeng 已提交
751
```
752 753 754
</TabItem>
</Tabs>

755 756 757 758 759 760 761
### 无模式写入

连接器支持无模式写入功能。

<Tabs defaultValue="list">
<TabItem value="list" label="List 写入">

S
sunpeng 已提交
762
##### 简单写入
763 764 765 766 767

```python
{{#include docs/examples/python/schemaless_insert.py}}
```

S
sunpeng 已提交
768
##### 带有 ttl 参数的写入
769 770 771 772 773

```python
{{#include docs/examples/python/schemaless_insert_ttl.py}}
```

S
sunpeng 已提交
774
##### 带有 req_id 参数的写入
775 776 777 778 779 780 781 782 783

```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
```

</TabItem>

<TabItem value="raw" label="Raw 写入">

S
sunpeng 已提交
784
##### 简单写入
785 786 787 788 789

```python
{{#include docs/examples/python/schemaless_insert_raw.py}}
```

S
sunpeng 已提交
790
##### 带有 ttl 参数的写入
791 792 793 794 795

```python
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
```

S
sunpeng 已提交
796
##### 带有 req_id 参数的写入
797 798 799 800 801 802 803 804

```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
```

</TabItem>
</Tabs>

S
sunpeng 已提交
805
### 执行带有 reqId 的无模式写入
S
sunpeng 已提交
806

S
sunpeng 已提交
807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827
连接器的 `schemaless_insert` 和 `schemaless_insert_raw` 方法支持 `req_id` 可选参数,此 `req_Id` 可用于请求链路追踪。

```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
```

```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
```

### 数据订阅

连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。

#### 创建 Topic

创建 Topic 相关请参考 [数据订阅文档](../../develop/tmq/#创建-topic)。

#### 创建 Consumer

<Tabs defaultValue="native">
S
sunpeng 已提交
828 829 830

<TabItem value="native" label="原生连接">

S
sunpeng 已提交
831
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#创建消费者-consumer)。
S
sunpeng 已提交
832

S
sunpeng 已提交
833 834
```python
from taos.tmq import Consumer
S
sunpeng 已提交
835

S
sunpeng 已提交
836
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
S
sunpeng 已提交
837
```
S
sunpeng 已提交
838
</TabItem>
S
sunpeng 已提交
839

S
sunpeng 已提交
840
<TabItem value="websocket" label="WebSocket 连接">
S
sunpeng 已提交
841

S
sunpeng 已提交
842
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。
S
sunpeng 已提交
843

S
sunpeng 已提交
844
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
S
sunpeng 已提交
845

S
sunpeng 已提交
846 847 848 849
```python
import taosws

consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
S
sunpeng 已提交
850 851
```

S
sunpeng 已提交
852 853
</TabItem>
</Tabs>
S
sunpeng 已提交
854

S
sunpeng 已提交
855
#### 订阅 topics
S
sunpeng 已提交
856

S
sunpeng 已提交
857
<Tabs defaultValue="native">
S
sunpeng 已提交
858

S
sunpeng 已提交
859
<TabItem value="native" label="原生连接">
S
sunpeng 已提交
860

S
sunpeng 已提交
861 862 863 864
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。

```python
consumer.subscribe(['topic1', 'topic2'])
S
sunpeng 已提交
865 866
```

S
sunpeng 已提交
867 868
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
S
sunpeng 已提交
869

S
sunpeng 已提交
870
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
S
sunpeng 已提交
871

S
sunpeng 已提交
872 873
```python
consumer.subscribe(['topic1', 'topic2'])
S
sunpeng 已提交
874
```
S
sunpeng 已提交
875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898

</TabItem>
</Tabs>

#### 消费数据

<Tabs defaultValue="native">

<TabItem value="native" label="原生连接">

Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。

```python
while True:
    res = consumer.poll(1)
    if not res:
        continue
    err = res.error()
    if err is not None:
        raise err
    val = res.value()

    for block in val:
        print(block.fetchall())
S
sunpeng 已提交
899 900
```

S
sunpeng 已提交
901 902 903 904
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">

Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
S
sunpeng 已提交
905 906

```python
S
sunpeng 已提交
907 908 909 910 911 912 913 914 915 916
while True:
    res = consumer.poll(timeout=1.0)
    if not res:
        continue
    err = res.error()
    if err is not None:
        raise err
    for block in message:
        for row in block:
            print(row)
S
sunpeng 已提交
917
```
S
sunpeng 已提交
918

S
sunpeng 已提交
919
</TabItem>
S
sunpeng 已提交
920
</Tabs>
S
sunpeng 已提交
921

S
sunpeng 已提交
922
#### 获取消费进度
S
sunpeng 已提交
923

S
sunpeng 已提交
924
<Tabs defaultValue="native">
S
sunpeng 已提交
925

S
sunpeng 已提交
926 927 928
<TabItem value="native" label="原生连接">

Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
S
sunpeng 已提交
929

S
sunpeng 已提交
930 931
```python
assignments = consumer.assignment()
S
sunpeng 已提交
932 933
```

S
sunpeng 已提交
934 935 936 937 938
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。

```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
S
sunpeng 已提交
939 940
```

S
sunpeng 已提交
941 942
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
S
sunpeng 已提交
943

S
sunpeng 已提交
944
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
S
sunpeng 已提交
945

S
sunpeng 已提交
946 947
```python
assignments = consumer.assignment()
S
sunpeng 已提交
948
```
S
sunpeng 已提交
949 950 951 952 953

Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。

```python
consumer.seek(topic='topic1', partition=0, offset=0)
S
sunpeng 已提交
954 955
```

S
sunpeng 已提交
956 957
</TabItem>
</Tabs>
S
sunpeng 已提交
958

S
sunpeng 已提交
959
#### 关闭订阅
S
sunpeng 已提交
960

S
sunpeng 已提交
961
<Tabs defaultValue="native">
S
sunpeng 已提交
962

S
sunpeng 已提交
963
<TabItem value="native" label="原生连接">
S
sunpeng 已提交
964

S
sunpeng 已提交
965 966 967 968 969
消费结束后,应当取消订阅,并关闭 Consumer。

```python
consumer.unsubscribe()
consumer.close()
S
sunpeng 已提交
970 971
```

S
sunpeng 已提交
972 973
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
S
sunpeng 已提交
974

S
sunpeng 已提交
975
消费结束后,应当取消订阅,并关闭 Consumer。
S
sunpeng 已提交
976

S
sunpeng 已提交
977 978 979
```python
consumer.unsubscribe()
consumer.close()
S
sunpeng 已提交
980 981
```

S
sunpeng 已提交
982 983
</TabItem>
</Tabs>
S
sunpeng 已提交
984

S
sunpeng 已提交
985 986 987
#### 完整示例

<Tabs defaultValue="native">
S
sunpeng 已提交
988

S
sunpeng 已提交
989 990 991 992
<TabItem value="native" label="原生连接">

```python
{{#include docs/examples/python/tmq_example.py}}
S
sunpeng 已提交
993
```
S
sunpeng 已提交
994 995 996

```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
S
sunpeng 已提交
997 998
```

S
sunpeng 已提交
999 1000
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
S
sunpeng 已提交
1001 1002

```python
S
sunpeng 已提交
1003 1004 1005 1006 1007
{{#include docs/examples/python/tmq_websocket_example.py}}
```

```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
S
sunpeng 已提交
1008
```
S
sunpeng 已提交
1009

S
sunpeng 已提交
1010 1011 1012 1013
</TabItem>
</Tabs>

### 更多示例程序
1014 1015 1016 1017 1018 1019 1020

| 示例程序链接                                                                                                  | 示例程序内容            |
| ------------------------------------------------------------------------------------------------------------- | ----------------------- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py)           | 参数绑定, 一次绑定多行 |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py)               | 参数绑定,一次绑定一行  |
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py)       | InfluxDB 行协议写入     |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py)               | 使用 JSON 类型的标签    |
1021
| [tmq_consumer.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/tmq_consumer.py)       | tmq 订阅              |
1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039

## 其它说明 

### 关于纳秒 (nanosecond)

由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。

1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
2. https://www.python.org/dev/peps/pep-0564/

## 重要更新

[**Release Notes**](https://github.com/taosdata/taos-connector-python/releases)

## API 参考

- [taos](https://docs.taosdata.com/api/taospy/taos/)
- [taosrest](https://docs.taosdata.com/api/taospy/taosrest)
1040 1041 1042 1043
  
## 常见问题

欢迎[提问或报告问题](https://github.com/taosdata/taos-connector-python/issues)。