From 3af153bae6aceb961e983a6a98beeb97059fa3bb Mon Sep 17 00:00:00 2001 From: sunpeng Date: Thu, 8 Jun 2023 14:43:07 +0800 Subject: [PATCH] docs: add docs for assignment and seek --- .../14-reference/03-connector/07-python.mdx | 41 ++++++++++++- .../examples/python/tmq_assignment_example.py | 58 +++++++++++++++++++ .../python/tmq_websocket_assgnment_example.py | 57 ++++++++++++++++++ docs/zh/08-connector/30-python.mdx | 12 ++++ 4 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 docs/examples/python/tmq_assignment_example.py create mode 100644 docs/examples/python/tmq_websocket_assgnment_example.py diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index b263af8ea6..ba322348cf 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -453,6 +453,44 @@ As the way to connect introduced above but add `req_id` argument. +### Subscription + +Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../develop/tmq/). + + + + +The `consumer` in the connector has the subscription api. For more subscription api parameters, please refer to [Data Subscription](../../develop/tmq/). + +```python +{{#include docs/examples/python/tmq_example.py}} +``` + +There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic. + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + + + + + +In addition to native connections, the connector also supports subscriptions via websockets. + +```python +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +There is an `assignment` API in the connector, which can get the assignment of the topic. And there is a `seek` api to reset the assignment of the topic. + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + + + + ### Schemaless Insert Connector support schemaless insert. @@ -507,7 +545,8 @@ Insert with req_id argument | Example program links | Example program content | | ------------------------------------------------------------------------------------------------------------- | ------------------- ---- | -| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once | +| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, +bind multiple rows at once | | [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py | [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing | | [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags | diff --git a/docs/examples/python/tmq_assignment_example.py b/docs/examples/python/tmq_assignment_example.py new file mode 100644 index 0000000000..a07347a9b9 --- /dev/null +++ b/docs/examples/python/tmq_assignment_example.py @@ -0,0 +1,58 @@ +import taos +from taos.tmq import Consumer +import taosws + + +def prepare(): + conn = taos.connect() + conn.execute("drop topic if exists tmq_assignment_demo_topic") + conn.execute("drop database if exists tmq_assignment_demo_db") + conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600") + conn.select_db("tmq_assignment_demo_db") + conn.execute( + "create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") + conn.execute( + "create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')") + + +def taos_get_assignment_and_seek_demo(): + prepare() + consumer = Consumer( + { + "group.id": "0", + # should disable snapshot, + # otherwise it will cause invalid params error + "experimental.snapshot.enable": "false", + } + ) + consumer.subscribe(["tmq_assignment_demo_topic"]) + + # get topic assignment + assignments = consumer.assignment() + for assignment in assignments: + print(assignment) + + # poll + consumer.poll(1) + consumer.poll(1) + + # get topic assignment again + after_pool_assignments = consumer.assignment() + for assignment in after_pool_assignments: + print(assignment) + + # seek to the beginning + for assignment in assignments: + consumer.seek(assignment) + + # now the assignment should be the same as before poll + assignments = consumer.assignment() + for assignment in assignments: + print(assignment) + + +if __name__ == '__main__': + taosws_get_assignment_and_seek_demo() diff --git a/docs/examples/python/tmq_websocket_assgnment_example.py b/docs/examples/python/tmq_websocket_assgnment_example.py new file mode 100644 index 0000000000..0f8e4a2804 --- /dev/null +++ b/docs/examples/python/tmq_websocket_assgnment_example.py @@ -0,0 +1,57 @@ +import taos +import taosws + + +def prepare(): + conn = taos.connect() + conn.execute("drop topic if exists tmq_assignment_demo_topic") + conn.execute("drop database if exists tmq_assignment_demo_db") + conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600") + conn.select_db("tmq_assignment_demo_db") + conn.execute( + "create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") + conn.execute( + "create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')") + + +def taosws_get_assignment_and_seek_demo(): + prepare() + consumer = taosws.Consumer(conf={ + "td.connect.websocket.scheme": "ws", + # should disable snapshot, + # otherwise it will cause invalid params error + "experimental.snapshot.enable": "false", + "group.id": "0", + }) + consumer.subscribe(["tmq_assignment_demo_topic"]) + + # get topic assignment + assignments = consumer.assignment() + for assignment in assignments: + print(assignment.to_string()) + + # poll + consumer.poll(1) + consumer.poll(1) + + # get topic assignment again + after_poll_assignments = consumer.assignment() + for assignment in after_poll_assignments: + print(assignment.to_string()) + + # seek to the beginning + for assignment in assignments: + for a in assignment.assignments(): + consumer.seek(assignment.topic(), a.vg_id(), a.offset()) + + # now the assignment should be the same as before poll + assignments = consumer.assignment() + for assignment in assignments: + print(assignment.to_string()) + + +if __name__ == '__main__': + taosws_get_assignment_and_seek_demo() diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 1037d66f17..5c310ff79f 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -467,6 +467,12 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 {{#include docs/examples/python/tmq_example.py}} ``` +连接器提供了 `assignment`接口,用于获取 topic assignment 的功能,可以查询订阅的 assignment 的消费进度,并提供 `seek` 接口,用于重置 topic 的 assignment。 + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + @@ -477,6 +483,12 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 {{#include docs/examples/python/tmq_websocket_example.py}} ``` +连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + -- GitLab