提交 2bf876f9 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0):
def assignment(self):
def poll(self, timeout: float = 1.0):
def close(self):
......@@ -453,6 +453,170 @@ As the way to connect introduced above but add `req_id` argument.
### Subscription
Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/).
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
The `consumer` in the connector contains the subscription api.
#### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": ""})
#### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
while True:
res = consumer.poll(1)
if not res:
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
#### assignment
The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
#### Seek
The `seek` function is used to reset the assignment of the topic.
tp = TopicPartition(topic='topic1', partition=0, offset=0)
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
#### Tmq subscription example
{{#include docs/examples/python/tmq_example.py}}
#### assignment and seek example
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
<TabItem value="websocket" label="WebSocket connection">
In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
#### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
while True:
res = consumer.poll(timeout=1.0)
if not res:
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
#### assignment
The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
#### Seek
The `seek` function is used to reset the assignment of the topic.
consumer.seek(topic='topic1', partition=0, offset=0)
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
#### Subscription example
{{#include docs/examples/python/tmq_websocket_example.py}}
#### Assignment and seek example
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
### Schemaless Insert
Connector support schemaless insert.
......@@ -507,7 +671,8 @@ Insert with req_id argument
| Example program links | Example program content |
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
......@@ -241,6 +241,8 @@ Input following content:
"poll.interval.ms": 1000,
"fetch.max.rows": 100,
"topic.per.stable": true,
"topic.ignore.db": false,
"out.format": "line",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
......@@ -356,8 +358,10 @@ The following configuration items apply to TDengine Sink Connector and TDengine
3. `timestamp.initial`: Data synchronization start time. The format is 'yyyy-MM-dd HH:mm:ss'. If it is not set, the data importing to Kafka will be started from the first/oldest row in the database.
4. `poll.interval.ms`: The time interval for checking newly created tables or removed tables, default value is 1000.
5. `fetch.max.rows`: The maximum number of rows retrieved when retrieving the database, default is 100.
6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 1000.
7. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`.
6. `query.interval.ms`: The time range of reading data from TDengine each time, its unit is millisecond. It should be adjusted according to the data flow in rate, the default value is 0, this means to get all the data to the latest time.
7. `out.format`: Result output format. `line` indicates that the output format is InfluxDB line protocol format, `json` indicates that the output format is json. The default is line.
8. `topic.per.stable`: If it's set to true, it means one super table in TDengine corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>-<stable.name>`; if it's set to false, it means the whole DB corresponds to a topic in Kafka, the topic naming rule is `<topic.prefix>-<connection.database>`.
9. `topic.ignore.db`: Whether the topic naming rule contains the database name: true indicates that the rule is `<topic.prefix>-<stable.name>`, false indicates that the rule is `<topic.prefix>-<connection.database>-<stable.name>`, and the default is false. Does not take effect when `topic.per.stable` is set to false.
## Other notes
import taos
from taos.tmq import Consumer
import taosws
def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
def taos_get_assignment_and_seek_demo():
consumer = Consumer(
"group.id": "0",
# should disable snapshot,
# otherwise it will cause invalid params error
"experimental.snapshot.enable": "false",
# get topic assignment
assignments = consumer.assignment()
for assignment in assignments:
# poll
# get topic assignment again
after_pool_assignments = consumer.assignment()
for assignment in after_pool_assignments:
# seek to the beginning
for assignment in assignments:
# now the assignment should be the same as before poll
assignments = consumer.assignment()
for assignment in assignments:
if __name__ == '__main__':
import taos
import taosws
def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
def taosws_get_assignment_and_seek_demo():
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": "ws",
# should disable snapshot,
# otherwise it will cause invalid params error
"experimental.snapshot.enable": "false",
"group.id": "0",
# get topic assignment
assignments = consumer.assignment()
for assignment in assignments:
# poll
# get topic assignment again
after_poll_assignments = consumer.assignment()
for assignment in after_poll_assignments:
# seek to the beginning
for assignment in assignments:
for a in assignment.assignments():
consumer.seek(assignment.topic(), a.vg_id(), a.offset())
# now the assignment should be the same as before poll
assignments = consumer.assignment()
for assignment in assignments:
if __name__ == '__main__':
......@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0):
def assignment(self):
def seek(self, partition):
def close(self):
......@@ -456,27 +456,169 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
### 数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅](../../develop/tmq/)。
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API,相关 API 定义请参考 [数据订阅文档](../../develop/tmq/#%E4%B8%BB%E8%A6%81%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84%E5%92%8C-api)。
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": ""})
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
consumer.subscribe(['topic1', 'topic2'])
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
while True:
res = consumer.poll(1)
if not res:
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
assignments = consumer.assignment()
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
tp = TopicPartition(topic='topic1', partition=0, offset=0)
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
#### tmq 订阅示例代码
{{#include docs/examples/python/tmq_example.py}}
#### 获取和重置消费进度示例代码
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
<TabItem value="websocket" label="WebSocket 连接">
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据。
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
consumer.subscribe(['topic1', 'topic2'])
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
while True:
res = consumer.poll(timeout=1.0)
if not res:
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
assignments = consumer.assignment()
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
consumer.seek(topic='topic1', partition=0, offset=0)
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
#### tmq 订阅示例代码
{{#include docs/examples/python/tmq_websocket_example.py}}
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -240,6 +240,8 @@ vi source-demo.json
"poll.interval.ms": 1000,
"fetch.max.rows": 100,
"topic.per.stable": true,
"topic.ignore.db": false,
"out.format": "line",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.storage.StringConverter"
......@@ -361,8 +363,10 @@ curl -X DELETE http://localhost:8083/connectors/TDengineSourceConnector
3. `timestamp.initial`: 数据同步起始时间。格式为'yyyy-MM-dd HH:mm:ss',若未指定则从指定 DB 中最早的一条记录开始。
4. `poll.interval.ms`: 检查是否有新建或删除的表的时间间隔,单位为 ms。默认为 1000。
5. `fetch.max.rows` : 检索数据库时最大检索条数。 默认为 100。
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 1000.
7. `topic.per.stable`: 如果设置为true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix>-<connection.database>-<stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix>-<connection.database>`
6. `query.interval.ms`: 从 TDengine 一次读取数据的时间跨度,需要根据表中的数据特征合理配置,避免一次查询的数据量过大或过小;在具体的环境中建议通过测试设置一个较优值,默认值为 0,即获取到当前最新时间的所有数据。
7. `out.format` : 结果集输出格式。`line` 表示输出格式为 InfluxDB Line 协议格式,`json` 表示输出格式是 json。默认为 line。
8. `topic.per.stable`: 如果设置为 true,表示一个超级表对应一个 Kafka topic,topic的命名规则 `<topic.prefix>-<connection.database>-<stable.name>`;如果设置为 false,则指定的 DB 中的所有数据进入一个 Kafka topic,topic 的命名规则为 `<topic.prefix>-<connection.database>`
9. `topic.ignore.db`: topic 命名规则是否包含 database 名称,true 表示规则为 `<topic.prefix>-<stable.name>`,false 表示规则为 `<topic.prefix>-<connection.database>-<stable.name>`,默认 false。在 `topic.per.stable` 设置为 false 时不生效。
## 其他说明
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册