未验证 提交 6a3e702f 编写于 作者: W wade zhang 提交者: GitHub

Merge pull request #22049 from taosdata/docs/sunpeng/update-python-doc-catalog

docs: update python doc catalog
...@@ -87,9 +87,9 @@ TDengine currently supports timestamp, number, character, Boolean type, and the ...@@ -87,9 +87,9 @@ TDengine currently supports timestamp, number, character, Boolean type, and the
|NCHAR|str| |NCHAR|str|
|JSON|str| |JSON|str|
## Installation ## Installation Steps
### Preparation ### Pre-installation preparation
1. Install Python. The recent taospy package requires Python 3.6.2+. The earlier versions of taospy require Python 3.7+. The taos-ws-py package requires Python 3.7+. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it. 1. Install Python. The recent taospy package requires Python 3.6.2+. The earlier versions of taospy require Python 3.7+. The taos-ws-py package requires Python 3.7+. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it.
2. Install [pip](https://pypi.org/project/pip/). In most cases, the Python installer comes with the pip utility. If not, please refer to [pip documentation](https://pip.pypa.io/en/stable/installation/) to install it. 2. Install [pip](https://pypi.org/project/pip/). In most cases, the Python installer comes with the pip utility. If not, please refer to [pip documentation](https://pip.pypa.io/en/stable/installation/) to install it.
...@@ -275,7 +275,7 @@ Transfer-Encoding: chunked ...@@ -275,7 +275,7 @@ Transfer-Encoding: chunked
</TabItem> </TabItem>
</Tabs> </Tabs>
### Using connectors to establish connections ### Specify the Host and Properties to get the connection
The following example code assumes that TDengine is installed locally and that the default configuration is used for both FQDN and serverPort. The following example code assumes that TDengine is installed locally and that the default configuration is used for both FQDN and serverPort.
...@@ -331,7 +331,69 @@ The parameter of `connect()` is the url of TDengine, and the protocol is `taosws ...@@ -331,7 +331,69 @@ The parameter of `connect()` is the url of TDengine, and the protocol is `taosws
</TabItem> </TabItem>
</Tabs> </Tabs>
## Example program ### Priority of configuration parameters
If the configuration parameters are duplicated in the parameters or client configuration file, the priority of the parameters, from highest to lowest, are as follows:
1. Parameters in `connect` function.
2. the configuration file taos.cfg of the TDengine client driver when using a native connection.
## Usage examples
### Create database and tables
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
```python
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
<TabItem value="rest" label="REST connection">
```python
conn = taosrest.connect(url="http://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
```python
conn = taosws.connect(url="ws://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
</Tabs>
### Insert data
```python
conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
```
:::
now is an internal function. The default is the current time of the client's computer. now + 1s represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years).
:::
### Basic Usage ### Basic Usage
...@@ -453,7 +515,7 @@ The `query` method of the `TaosConnection` class can be used to query data and r ...@@ -453,7 +515,7 @@ The `query` method of the `TaosConnection` class can be used to query data and r
</TabItem> </TabItem>
</Tabs> </Tabs>
### Usage with req_id ### Execute SQL with reqId
By using the optional req_id parameter, you can specify a request ID that can be used for tracing. By using the optional req_id parameter, you can specify a request ID that can be used for tracing.
...@@ -553,171 +615,138 @@ As the way to connect introduced above but add `req_id` argument. ...@@ -553,171 +615,138 @@ As the way to connect introduced above but add `req_id` argument.
</TabItem> </TabItem>
</Tabs> </Tabs>
### Subscription ### Writing data via parameter binding
Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound.
<Tabs defaultValue="native"> <Tabs>
<TabItem value="native" label="native connection"> <TabItem value="native" label="native connection">
The `consumer` in the connector contains the subscription api. ##### Create Stmt
##### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
```python Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
``` ```
import taos
##### Subscribe topics conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
##### Consume ##### parameter binding
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python Call the `new_multi_binds` function to create the parameter list for parameter bindings.
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
``` ```
params = new_multi_binds(16)
##### assignment params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
The `assignment` function is used to get the assignment of the topic. params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
```python params[4].smallint([3, None, 2])
assignments = consumer.assignment() params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
``` ```
##### Seek Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values.
The `seek` function is used to reset the assignment of the topic.
```python ```
tp = TopicPartition(topic='topic1', partition=0, offset=0) stmt.bind_param_batch(params)
consumer.seek(tp)
``` ```
##### After consuming data ##### execute sql
You should unsubscribe to the topics and close the consumer after consuming. Call `execute` method to execute sql.
```python ```
consumer.unsubscribe() stmt.execute()
consumer.close()
``` ```
##### Tmq subscription example ##### Close Stmt
```python ```
{{#include docs/examples/python/tmq_example.py}} stmt.close()
``` ```
##### assignment and seek example ##### Example
```python ```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} {{#include docs/examples/python/stmt_example.py}}
``` ```
</TabItem> </TabItem>
<TabItem value="websocket" label="WebSocket connection"> <TabItem value="websocket" label="WebSocket connection">
In addition to native connections, the connector also supports subscriptions via websockets. ##### Create Stmt
##### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```python ```
import taosws import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
``` ```
##### subscribe topics ##### Prepare sql
The `subscribe` function is used to subscribe to a list of topics. Call `prepare` method in stmt to prepare sql.
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
``` ```
##### assignment ##### parameter binding
The `assignment` function is used to get the assignment of the topic. Call the `bind_param` method to bind parameters.
```python ```
assignments = consumer.assignment() stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
``` ```
##### Seek Call the `add_batch` method to add parameters to the batch.
The `seek` function is used to reset the assignment of the topic.
```python ```
consumer.seek(topic='topic1', partition=0, offset=0) stmt.add_batch()
``` ```
##### After consuming data ##### execute sql
You should unsubscribe to the topics and close the consumer after consuming. Call `execute` method to execute sql.
```python ```
consumer.unsubscribe() stmt.execute()
consumer.close()
``` ```
##### Subscription example ##### Close Stmt
```python ```
{{#include docs/examples/python/tmq_websocket_example.py}} stmt.close()
``` ```
##### Assignment and seek example ##### Example
```python ```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} {{#include docs/examples/python/stmt_websocket_example.py}}
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
### Schemaless Insert ### Schemaless Writing
Connector support schemaless insert. Connector support schemaless insert.
...@@ -767,134 +796,211 @@ Connector support schemaless insert. ...@@ -767,134 +796,211 @@ Connector support schemaless insert.
</TabItem> </TabItem>
</Tabs> </Tabs>
### Parameter Binding ### Schemaless with reqId
The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound. There is a optional parameter called `req_id` in `schemaless_insert` and `schemaless_insert_raw` method. This reqId can be used to request link tracing.
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
```
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
```
### Data Subscription
Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/).
#### Create a Topic
To create topic, please refer to [Data Subscription](../../../develop/tmq/#create-a-topic).
#### Create a Consumer
<Tabs defaultValue="native">
<Tabs>
<TabItem value="native" label="native connection"> <TabItem value="native" label="native connection">
##### Create Stmt The consumer in the connector contains the subscription api. The syntax for creating a consumer is consumer = Consumer(configs). For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
Call the `statement` method in `Connection` to create the `stmt` for parameter binding. ```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
``` ```
import taos </TabItem>
conn = taos.connect() <TabItem value="websocket" label="WebSocket connection">
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
##### parameter binding In addition to native connections, the connector also supports subscriptions via websockets.
Call the `new_multi_binds` function to create the parameter list for parameter bindings. The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
``` ```python
params = new_multi_binds(16) import taosws
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False)) consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
``` ```
Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values. </TabItem>
</Tabs>
``` #### Subscribe to a Topic
stmt.bind_param_batch(params)
```
##### execute sql <Tabs defaultValue="native">
Call `execute` method to execute sql. <TabItem value="native" label="native connection">
``` The `subscribe` function is used to subscribe to a list of topics.
stmt.execute()
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
##### Close Stmt </TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
stmt.close()
</TabItem>
</Tabs>
#### Consume messages
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
``` ```
##### Example </TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python ```python
{{#include docs/examples/python/stmt_example.py}} while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
``` ```
</TabItem> </TabItem>
</Tabs>
<TabItem value="websocket" label="WebSocket connection"> #### Assignment subscription Offset
##### Create Stmt <Tabs defaultValue="native">
Call the `statement` method in `Connection` to create the `stmt` for parameter binding. <TabItem value="native" label="native connection">
The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
``` ```
import taosws
conn = taosws.connect('taosws://localhost:6041/test') The `seek` function is used to reset the assignment of the topic.
stmt = conn.statement()
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
``` ```
##### Prepare sql </TabItem>
<TabItem value="websocket" label="WebSocket connection">
Call `prepare` method in stmt to prepare sql. The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
``` ```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
The `seek` function is used to reset the assignment of the topic.
```python
consumer.seek(topic='topic1', partition=0, offset=0)
``` ```
##### parameter binding </TabItem>
</Tabs>
Call the `bind_param` method to bind parameters. #### Close subscriptions
``` <Tabs defaultValue="native">
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
Call the `add_batch` method to add parameters to the batch. <TabItem value="native" label="native connection">
``` You should unsubscribe to the topics and close the consumer after consuming.
stmt.add_batch()
```python
consumer.unsubscribe()
consumer.close()
``` ```
##### execute sql </TabItem>
<TabItem value="websocket" label="WebSocket connection">
Call `execute` method to execute sql. You should unsubscribe to the topics and close the consumer after consuming.
``` ```python
stmt.execute() consumer.unsubscribe()
consumer.close()
``` ```
##### Close Stmt </TabItem>
</Tabs>
#### Full Sample Code
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
```python
{{#include docs/examples/python/tmq_example.py}}
``` ```
stmt.close()
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
``` ```
##### Example </TabItem>
<TabItem value="websocket" label="WebSocket connection">
```python ```python
{{#include docs/examples/python/stmt_websocket_example.py}} {{#include docs/examples/python/tmq_websocket_example.py}}
```
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
......
...@@ -71,7 +71,7 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出 ...@@ -71,7 +71,7 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出
{{#include docs/examples/python/handle_exception.py}} {{#include docs/examples/python/handle_exception.py}}
``` ```
TDengine DataType 和 Python DataType ## TDengine DataType 和 Python DataType
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下: TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
...@@ -277,7 +277,7 @@ Transfer-Encoding: chunked ...@@ -277,7 +277,7 @@ Transfer-Encoding: chunked
</TabItem> </TabItem>
</Tabs> </Tabs>
### 使用连接器建立连接 ### 指定 Host 和 Properties 获取连接
以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。
...@@ -333,8 +333,69 @@ Transfer-Encoding: chunked ...@@ -333,8 +333,69 @@ Transfer-Encoding: chunked
</TabItem> </TabItem>
</Tabs> </Tabs>
### 配置参数的优先级
如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下:
1. 连接参数
2. 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg
## 使用示例 ## 使用示例
### 创建数据库和表
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
```python
conn = taos.connect()
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
# change database. same as execute "USE db"
conn.select_db("test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
<TabItem value="rest" label="REST 连接">
```python
conn = taosrest.connect(url="http://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python
conn = taosws.connect(url="ws://localhost:6041")
# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement.
conn.execute("DROP DATABASE IF EXISTS test")
conn.execute("CREATE DATABASE test")
conn.execute("USE test")
conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)")
```
</TabItem>
</Tabs>
### 插入数据
```python
conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)")
```
:::
now 为系统内部函数,默认为客户端所在计算机当前时间。 now + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。
:::
### 基本使用 ### 基本使用
<Tabs defaultValue="rest"> <Tabs defaultValue="rest">
...@@ -373,7 +434,6 @@ Transfer-Encoding: chunked ...@@ -373,7 +434,6 @@ Transfer-Encoding: chunked
:::note :::note
TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。 TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
::: :::
</TabItem> </TabItem>
...@@ -456,7 +516,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 ...@@ -456,7 +516,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方
</TabItem> </TabItem>
</Tabs> </Tabs>
### 与 req_id 一起使用 ### 执行带有 reqId 的 SQL
使用可选的 req_id 参数,指定请求 id,可以用于 tracing 使用可选的 req_id 参数,指定请求 id,可以用于 tracing
...@@ -557,171 +617,138 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 ...@@ -557,171 +617,138 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方
</TabItem> </TabItem>
</Tabs> </Tabs>
### 数据订阅 ### 通过参数绑定写入数据
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/) TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数
<Tabs defaultValue="native"> <Tabs>
<TabItem value="native" label="原生连接"> <TabItem value="native" label="原生连接">
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 ##### 创建 stmt
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
```python Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
``` ```
import taos
##### 订阅 topics conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
##### 消费数据 ##### 参数绑定
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python 调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val: ```
print(block.fetchall()) params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
``` ```
##### 获取消费进度 调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python ```
assignments = consumer.assignment() stmt.bind_param_batch(params)
``` ```
##### 指定订阅 Offset ##### 执行 sql
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 调用 stmt 的 `execute` 方法执行 sql
```python ```
tp = TopicPartition(topic='topic1', partition=0, offset=0) stmt.execute()
consumer.seek(tp)
``` ```
##### 关闭订阅 ##### 关闭 stmt
消费结束后,应当取消订阅,并关闭 Consumer 最后需要关闭 stmt
```python
consumer.unsubscribe()
consumer.close()
``` ```
stmt.close()
##### 完整示例
```python
{{#include docs/examples/python/tmq_example.py}}
``` ```
##### 获取和重置消费进度示例代码 ##### 示例代码
```python ```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} {{#include docs/examples/python/stmt_example.py}}
``` ```
</TabItem> </TabItem>
<TabItem value="websocket" label="WebSocket 连接"> <TabItem value="websocket" label="WebSocket 连接">
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 ##### 创建 stmt
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer) Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数
```python ```
import taosws import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
``` ```
##### 订阅 topics ##### 解析 sql
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic 调用 stmt 的 `prepare` 方法来解析 insert 语句
```python ```
consumer.subscribe(['topic1', 'topic2']) stmt.prepare("insert into t1 values (?, ?, ?, ?)")
``` ```
##### 消费数据 ##### 参数绑定
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息 调用 stmt 的 `bind_param` 方法绑定参数
```python ```
while True: stmt.bind_param([
res = consumer.poll(timeout=1.0) taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
if not res: taosws.ints_to_column([1, 2, 3, 4]),
continue taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
err = res.error() taosws.varchar_to_column(['a', 'b', 'c', 'd']),
if err is not None: ])
raise err
for block in message:
for row in block:
print(row)
``` ```
##### 获取消费进度 调用 stmt 的 `add_batch` 方法,将参数加入批处理。
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python ```
assignments = consumer.assignment() stmt.add_batch()
``` ```
##### 重置消费进度 ##### 执行 sql
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 调用 stmt 的 `execute` 方法执行 sql
```python ```
consumer.seek(topic='topic1', partition=0, offset=0) stmt.execute()
``` ```
##### 结束消费 ##### 关闭 stmt
消费结束后,应当取消订阅,并关闭 Consumer 最后需要关闭 stmt
```python
consumer.unsubscribe()
consumer.close()
``` ```
stmt.close()
##### tmq 订阅示例代码
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
``` ```
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 ##### 示例代码
##### 获取和重置消费进度示例代码
```python ```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} {{#include docs/examples/python/stmt_websocket_example.py}}
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
...@@ -775,138 +802,211 @@ consumer.close() ...@@ -775,138 +802,211 @@ consumer.close()
</TabItem> </TabItem>
</Tabs> </Tabs>
### 通过参数绑定写入数据 ### 执行带有 reqId 的无模式写入
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。 连接器的 `schemaless_insert` 和 `schemaless_insert_raw` 方法支持 `req_id` 可选参数,此 `req_Id` 可用于请求链路追踪。
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
```
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
```
### 数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。
#### 创建 Topic
创建 Topic 相关请参考 [数据订阅文档](../../develop/tmq/#创建-topic)。
#### 创建 Consumer
<Tabs defaultValue="native">
<Tabs>
<TabItem value="native" label="原生连接"> <TabItem value="native" label="原生连接">
##### 创建 stmt `Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#创建消费者-consumer)。
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 ```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
``` ```
import taos </TabItem>
conn = taos.connect() <TabItem value="websocket" label="WebSocket 连接">
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
##### 参数绑定 除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定 taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)
``` ```python
params = new_multi_binds(16) import taosws
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False)) consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
``` ```
调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。 </TabItem>
</Tabs>
``` #### 订阅 topics
stmt.bind_param_batch(params)
```
##### 执行 sql <Tabs defaultValue="native">
调用 stmt 的 `execute` 方法执行 sql <TabItem value="native" label="原生连接">
``` Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
stmt.execute()
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
##### 关闭 stmt </TabItem>
<TabItem value="websocket" label="WebSocket 连接">
最后需要关闭 stmt Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic
```python
consumer.subscribe(['topic1', 'topic2'])
``` ```
stmt.close()
</TabItem>
</Tabs>
#### 消费数据
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
``` ```
##### 示例代码 </TabItem>
<TabItem value="websocket" label="WebSocket 连接">
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python ```python
{{#include docs/examples/python/stmt_example.py}} while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
``` ```
</TabItem> </TabItem>
</Tabs>
<TabItem value="websocket" label="WebSocket 连接"> #### 获取消费进度
##### 创建 stmt <Tabs defaultValue="native">
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 <TabItem value="native" label="原生连接">
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
assignments = consumer.assignment()
``` ```
import taosws
conn = taosws.connect('taosws://localhost:6041/test') Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
stmt = conn.statement()
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
``` ```
##### 解析 sql </TabItem>
<TabItem value="websocket" label="WebSocket 连接">
调用 stmt 的 `prepare` 方法来解析 insert 语句 Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表
```python
assignments = consumer.assignment()
``` ```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
```python
consumer.seek(topic='topic1', partition=0, offset=0)
``` ```
##### 参数绑定 </TabItem>
</Tabs>
调用 stmt 的 `bind_param` 方法绑定参数。 #### 关闭订阅
``` <Tabs defaultValue="native">
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
调用 stmt 的 `add_batch` 方法,将参数加入批处理。 <TabItem value="native" label="原生连接">
``` 消费结束后,应当取消订阅,并关闭 Consumer。
stmt.add_batch()
```python
consumer.unsubscribe()
consumer.close()
``` ```
##### 执行 sql </TabItem>
<TabItem value="websocket" label="WebSocket 连接">
调用 stmt 的 `execute` 方法执行 sql 消费结束后,应当取消订阅,并关闭 Consumer。
``` ```python
stmt.execute() consumer.unsubscribe()
consumer.close()
``` ```
##### 关闭 stmt </TabItem>
</Tabs>
最后需要关闭 stmt。 #### 完整示例
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
```python
{{#include docs/examples/python/tmq_example.py}}
``` ```
stmt.close()
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
``` ```
##### 示例代码 </TabItem>
<TabItem value="websocket" label="WebSocket 连接">
```python ```python
{{#include docs/examples/python/stmt_websocket_example.py}} {{#include docs/examples/python/tmq_websocket_example.py}}
```
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
``` ```
</TabItem> </TabItem>
</Tabs> </Tabs>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册