提交 818e5ddf 编写于 作者: S sunpeng

docs: upgrade docs from python subscription

上级 3af153ba
...@@ -105,6 +105,12 @@ class Consumer: ...@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0): def poll(self, timeout: float = 1.0):
pass pass
def assignment(self):
pass
def poll(self, timeout: float = 1.0):
pass
def close(self): def close(self):
pass pass
......
...@@ -460,13 +460,77 @@ Connector support data subscription. For more information about subscroption, pl ...@@ -460,13 +460,77 @@ Connector support data subscription. For more information about subscroption, pl
<Tabs defaultValue="native"> <Tabs defaultValue="native">
<TabItem value="native" label="native connection"> <TabItem value="native" label="native connection">
The `consumer` in the connector has the subscription api. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/). The `consumer` in the connector contains the subscription api.
#### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/).
```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
```
#### assignment
The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
```
#### Seek
The `seek` function is used to reset the assignment of the topic.
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
```python
consumer.unsubscribe()
consumer.close()
```
#### Tmq subscription example
```python ```python
{{#include docs/examples/python/tmq_example.py}} {{#include docs/examples/python/tmq_example.py}}
``` ```
There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic. #### assignment and seek example
```python ```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} {{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
...@@ -478,11 +542,73 @@ There is an `assignment` API in the connector, which can get the assignment of t ...@@ -478,11 +542,73 @@ There is an `assignment` API in the connector, which can get the assignment of t
In addition to native connections, the connector also supports subscriptions via websockets. In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/#create-a-consumer).
```python
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
```
#### assignment
The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
```
#### Seek
The `seek` function is used to reset the assignment of the topic.
```python
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
```python
consumer.unsubscribe()
consumer.close()
```
#### Subscription example
```python ```python
{{#include docs/examples/python/tmq_websocket_example.py}} {{#include docs/examples/python/tmq_websocket_example.py}}
``` ```
There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic. #### Assignment and seek example
```python ```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} {{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......
...@@ -105,6 +105,12 @@ class Consumer: ...@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0): def poll(self, timeout: float = 1.0):
pass pass
def assignment(self):
pass
def seek(self, partition):
pass
def close(self): def close(self):
pass pass
......
...@@ -456,18 +456,82 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ...@@ -456,18 +456,82 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
### 数据订阅 ### 数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅](../../develop/tmq/)。 连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。
<Tabs defaultValue="native"> <Tabs defaultValue="native">
<TabItem value="native" label="原生连接"> <TabItem value="native" label="原生连接">
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API,相关 API 定义请参考 [数据订阅文档](../../develop/tmq/#%E4%B8%BB%E8%A6%81%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84%E5%92%8C-api)。 `Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
```
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
assignments = consumer.assignment()
```
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
```python
consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
```python ```python
{{#include docs/examples/python/tmq_example.py}} {{#include docs/examples/python/tmq_example.py}}
``` ```
连接器提供了 `assignment`接口,用于获取 topic assignment 的功能,可以查询订阅的 assignment 的消费进度,并提供 `seek` 接口,用于重置 topic 的 assignment。 #### 获取和重置消费进度示例代码
```python ```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} {{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
...@@ -477,7 +541,71 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ...@@ -477,7 +541,71 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
<TabItem value="websocket" label="WebSocket 连接"> <TabItem value="websocket" label="WebSocket 连接">
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据。 除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
```python
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python
while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
```
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
assignments = consumer.assignment()
```
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
```python
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
```python
consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
```python ```python
{{#include docs/examples/python/tmq_websocket_example.py}} {{#include docs/examples/python/tmq_websocket_example.py}}
...@@ -485,6 +613,8 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ...@@ -485,6 +613,8 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
```python ```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} {{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
``` ```
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册