diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx index d951923de5bca88f8ca605957f0a373b35263eb2..578f38e73d02efa0da04531986c037176d68482b 100644 --- a/docs/en/07-develop/07-tmq.mdx +++ b/docs/en/07-develop/07-tmq.mdx @@ -105,6 +105,12 @@ class Consumer: def poll(self, timeout: float = 1.0): pass + def assignment(self): + pass + + def poll(self, timeout: float = 1.0): + pass + def close(self): pass diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index b263af8ea6afcebe23726d5baa8dd4246e239963..461bdfbf162e696b430c1edb9b09ada70e086fb9 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -24,6 +24,36 @@ The source code for the Python connector is hosted on [GitHub](https://github.co We recommend using the latest version of `taospy`, regardless of the version of TDengine. +## Handling Exceptions + +There are 4 types of exception in python connector. + +- The exception of Python Connector itself. +- The exception of native library. +- The exception of websocket +- The exception of subscription. +- The exception of other TDengine function modules. + +|Error Type|Description|Suggested Actions| +|:--------:|:---------:|:---------------:| +|InterfaceError|the native library is too old that it cannot support the function|please check the TDengine client version| +|ConnectionError|connection error|please check TDengine's status and the connection params| +|DatabaseError|database error|please upgrade Python connector to latest| +|OperationalError|operation error|| +|ProgrammingError||| +|StatementError|the exception of stmt|| +|ResultError||| +|SchemalessError|the exception of stmt schemaless|| +|TmqError|the exception of stmt tmq|| + +It usually uses try-expect to handle exceptions in python. For exception handling, please refer to [Python Errors and Exceptions Documentation](https://docs.python.org/3/tutorial/errors.html). + +All exceptions from the Python Connector are thrown directly. Applications should handle these exceptions. For example: + +```python +{{#include docs/examples/python/handle_exception.py}} +``` + ## Supported features - Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing. @@ -343,6 +373,8 @@ For a more detailed description of the `sql()` method, please refer to [RestClie +The `Connection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods). + ```python {{#include docs/examples/python/connect_websocket_examples.py:basic}} ``` @@ -353,6 +385,46 @@ For a more detailed description of the `sql()` method, please refer to [RestClie +### Querying Data + + + + +The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`. + +```python +{{#include docs/examples/python/connection_usage_native_reference.py:query}} +``` + +:::tip +The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list. +::: + + + + + +The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. + +```python +{{#include docs/examples/python/rest_client_example.py}} +``` + +For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). + + + + + +The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`. + +```python +{{#include docs/examples/python/connect_websocket_examples.py:basic}} +``` + + + + ### Usage with req_id By using the optional req_id parameter, you can specify a request ID that can be used for tracing. @@ -453,6 +525,170 @@ As the way to connect introduced above but add `req_id` argument. +### Subscription + +Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). + + + + +The `consumer` in the connector contains the subscription api. + +#### Create Consumer + +The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/). + +```python +from taos.tmq import Consumer + +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) +``` + +#### Subscribe topics + +The `subscribe` function is used to subscribe to a list of topics. + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + +#### Consume + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) +``` + +#### assignment + +The `assignment` function is used to get the assignment of the topic. + +```python +assignments = consumer.assignment() +``` + +#### Seek + +The `seek` function is used to reset the assignment of the topic. + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) +``` + +#### After consuming data + +You should unsubscribe to the topics and close the consumer after consuming. + +```python +consumer.unsubscribe() +consumer.close() +``` + +#### Tmq subscription example + +```python +{{#include docs/examples/python/tmq_example.py}} +``` + +#### assignment and seek example + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + + + + + +In addition to native connections, the connector also supports subscriptions via websockets. + +#### Create Consumer + +The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). + +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +``` + +#### subscribe topics + +The `subscribe` function is used to subscribe to a list of topics. + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + +#### Consume + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. + +```python +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) +``` + +#### assignment + +The `assignment` function is used to get the assignment of the topic. + +```python +assignments = consumer.assignment() +``` + +#### Seek + +The `seek` function is used to reset the assignment of the topic. + +```python +consumer.seek(topic='topic1', partition=0, offset=0) +``` + +#### After consuming data + +You should unsubscribe to the topics and close the consumer after consuming. + +```python +consumer.unsubscribe() +consumer.close() +``` + +#### Subscription example + +```python +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +#### Assignment and seek example + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + + + + ### Schemaless Insert Connector support schemaless insert. @@ -503,11 +739,143 @@ Insert with req_id argument +### Parameter Binding + +The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound. + + + + +#### Create Stmt + +Call the `statement` method in `Connection` to create the `stmt` for parameter binding. + +``` +import taos + +conn = taos.connect() +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") +``` + +#### parameter binding + +Call the `new_multi_binds` function to create the parameter list for parameter bindings. + +``` +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) +``` + +Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values. + +``` +stmt.bind_param_batch(params) +``` + +#### execute sql + +Call `execute` method to execute sql. + +``` +stmt.execute() +``` + +#### Close Stmt + +``` +stmt.close() +``` + +#### Example + +```python +{{#include docs/examples/python/stmt_example.py}} +``` + + + + +#### Create Stmt + +Call the `statement` method in `Connection` to create the `stmt` for parameter binding. + +``` +import taosws + +conn = taosws.connect('taosws://localhost:6041/test') +stmt = conn.statement() +``` + +#### Prepare sql + +Call `prepare` method in stmt to prepare sql. + +``` +stmt.prepare("insert into t1 values (?, ?, ?, ?)") +``` + +#### parameter binding + +Call the `bind_param` method to bind parameters. + +``` +stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), +]) +``` + +Call the `add_batch` method to add parameters to the batch. + +``` +stmt.add_batch() +``` + +#### execute sql + +Call `execute` method to execute sql. + +``` +stmt.execute() +``` + +#### Close Stmt + +``` +stmt.close() +``` + +#### Example + +```python +{{#include docs/examples/python/stmt_websocket_example.py}} +``` + + + ### Other sample programs | Example program links | Example program content | | ------------------------------------------------------------------------------------------------------------- | ------------------- ---- | -| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once | +| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, +bind multiple rows at once | | [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py | [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing | | [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags | @@ -515,14 +883,6 @@ Insert with req_id argument ## Other notes -### Exception handling - -All errors from database operations are thrown directly as exceptions and the error message from the database is passed up the exception stack. The application is responsible for exception handling. For example: - -```python -{{#include docs/examples/python/handle_exception.py}} -``` - ### About nanoseconds Due to the current imperfection of Python's nanosecond support (see link below), the current implementation returns integers at nanosecond precision instead of the `datetime` type produced by `ms` and `us`, which application developers will need to handle on their own. And it is recommended to use pandas' to_datetime(). The Python Connector may modify the interface in the future if Python officially supports nanoseconds in full. diff --git a/docs/examples/python/stmt_example.py b/docs/examples/python/stmt_example.py new file mode 100644 index 0000000000000000000000000000000000000000..83197a777ab6419962212e7f1106fba7b78e884e --- /dev/null +++ b/docs/examples/python/stmt_example.py @@ -0,0 +1,82 @@ +#! + +import taosws + +import taos + +db_name = 'test_ws_stmt' + + +def before(): + taos_conn = taos.connect() + taos_conn.execute("drop database if exists %s" % db_name) + taos_conn.execute("create database %s" % db_name) + taos_conn.select_db(db_name) + taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))") + taos_conn.execute( + "create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))") + taos_conn.close() + + +def stmt_insert(): + before() + + conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name) + + while True: + try: + stmt = conn.statement() + stmt.prepare("insert into t1 values (?, ?, ?, ?)") + + stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), + ]) + + stmt.add_batch() + rows = stmt.execute() + print(rows) + stmt.close() + except Exception as e: + if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed + continue + else: + raise e + + break + + +def stmt_insert_into_stable(): + before() + + conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name) + + while True: + try: + stmt = conn.statement() + stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)") + stmt.set_tbname('stb1_1') + stmt.set_tags([ + taosws.int_to_tag(1), + taosws.varchar_to_tag('aaa'), + ]) + stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), + ]) + + stmt.add_batch() + rows = stmt.execute() + print(rows) + stmt.close() + except Exception as e: + if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed + continue + else: + raise e + + break diff --git a/docs/examples/python/stmt_websocket_example.py b/docs/examples/python/stmt_websocket_example.py new file mode 100644 index 0000000000000000000000000000000000000000..d0824cfa9f29320a6613eb49ee58d108fc61cfd7 --- /dev/null +++ b/docs/examples/python/stmt_websocket_example.py @@ -0,0 +1,78 @@ +#! +import time + +import taosws + +import taos + + +def before_test(db_name): + taos_conn = taos.connect() + taos_conn.execute("drop database if exists %s" % db_name) + taos_conn.execute("create database %s" % db_name) + taos_conn.select_db(db_name) + taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))") + taos_conn.execute( + "create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))") + taos_conn.close() + + +def after_test(db_name): + taos_conn = taos.connect() + taos_conn.execute("drop database if exists %s" % db_name) + taos_conn.close() + + +def stmt_insert(): + db_name = 'test_ws_stmt_{}'.format(int(time.time())) + before_test(db_name) + + conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name) + + stmt = conn.statement() + stmt.prepare("insert into t1 values (?, ?, ?, ?)") + + stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), + ]) + + stmt.add_batch() + rows = stmt.execute() + assert rows == 4 + stmt.close() + after_test(db_name) + + +def stmt_insert_into_stable(): + db_name = 'test_ws_stmt_{}'.format(int(time.time())) + before_test(db_name) + + conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name) + + stmt = conn.statement() + stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)") + stmt.set_tbname('stb1_1') + stmt.set_tags([ + taosws.int_to_tag(1), + taosws.varchar_to_tag('aaa'), + ]) + stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), + ]) + + stmt.add_batch() + rows = stmt.execute() + assert rows == 4 + stmt.close() + after_test(db_name) + + +if __name__ == '__main__': + stmt_insert() + stmt_insert_into_stable() diff --git a/docs/examples/python/tmq_assignment_example.py b/docs/examples/python/tmq_assignment_example.py new file mode 100644 index 0000000000000000000000000000000000000000..a07347a9b9523f2040895c2973e27ccf31799000 --- /dev/null +++ b/docs/examples/python/tmq_assignment_example.py @@ -0,0 +1,58 @@ +import taos +from taos.tmq import Consumer +import taosws + + +def prepare(): + conn = taos.connect() + conn.execute("drop topic if exists tmq_assignment_demo_topic") + conn.execute("drop database if exists tmq_assignment_demo_db") + conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600") + conn.select_db("tmq_assignment_demo_db") + conn.execute( + "create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") + conn.execute( + "create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')") + + +def taos_get_assignment_and_seek_demo(): + prepare() + consumer = Consumer( + { + "group.id": "0", + # should disable snapshot, + # otherwise it will cause invalid params error + "experimental.snapshot.enable": "false", + } + ) + consumer.subscribe(["tmq_assignment_demo_topic"]) + + # get topic assignment + assignments = consumer.assignment() + for assignment in assignments: + print(assignment) + + # poll + consumer.poll(1) + consumer.poll(1) + + # get topic assignment again + after_pool_assignments = consumer.assignment() + for assignment in after_pool_assignments: + print(assignment) + + # seek to the beginning + for assignment in assignments: + consumer.seek(assignment) + + # now the assignment should be the same as before poll + assignments = consumer.assignment() + for assignment in assignments: + print(assignment) + + +if __name__ == '__main__': + taosws_get_assignment_and_seek_demo() diff --git a/docs/examples/python/tmq_websocket_assgnment_example.py b/docs/examples/python/tmq_websocket_assgnment_example.py new file mode 100644 index 0000000000000000000000000000000000000000..0f8e4a28042e484029068816fdfebd3b0b27a587 --- /dev/null +++ b/docs/examples/python/tmq_websocket_assgnment_example.py @@ -0,0 +1,57 @@ +import taos +import taosws + + +def prepare(): + conn = taos.connect() + conn.execute("drop topic if exists tmq_assignment_demo_topic") + conn.execute("drop database if exists tmq_assignment_demo_db") + conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600") + conn.select_db("tmq_assignment_demo_db") + conn.execute( + "create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)") + conn.execute( + "create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')") + conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')") + + +def taosws_get_assignment_and_seek_demo(): + prepare() + consumer = taosws.Consumer(conf={ + "td.connect.websocket.scheme": "ws", + # should disable snapshot, + # otherwise it will cause invalid params error + "experimental.snapshot.enable": "false", + "group.id": "0", + }) + consumer.subscribe(["tmq_assignment_demo_topic"]) + + # get topic assignment + assignments = consumer.assignment() + for assignment in assignments: + print(assignment.to_string()) + + # poll + consumer.poll(1) + consumer.poll(1) + + # get topic assignment again + after_poll_assignments = consumer.assignment() + for assignment in after_poll_assignments: + print(assignment.to_string()) + + # seek to the beginning + for assignment in assignments: + for a in assignment.assignments(): + consumer.seek(assignment.topic(), a.vg_id(), a.offset()) + + # now the assignment should be the same as before poll + assignments = consumer.assignment() + for assignment in assignments: + print(assignment.to_string()) + + +if __name__ == '__main__': + taosws_get_assignment_and_seek_demo() diff --git a/docs/zh/07-develop/07-tmq.mdx b/docs/zh/07-develop/07-tmq.mdx index bfea926f53d7287b98f4d1467f7a1022b9049a38..a87a1f64f80223a8b19b21bd277973952cf8dfc8 100644 --- a/docs/zh/07-develop/07-tmq.mdx +++ b/docs/zh/07-develop/07-tmq.mdx @@ -105,6 +105,12 @@ class Consumer: def poll(self, timeout: float = 1.0): pass + def assignment(self): + pass + + def seek(self, partition): + pass + def close(self): pass diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 1037d66f17e619e9b01688447320f981f3679604..8752dc214565c7834cdc6903f5247cd4c64194a2 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -25,6 +25,36 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con 无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。 +## 处理异常 + +Python 连接器可能会产生 4 种异常: + +- Python 连接器本身的异常 +- 原生连接方式的异常 +- websocket 连接方式异常 +- 数据订阅异常 +- TDengine 其他功能模块的异常 + +|Error Type|Description|Suggested Actions| +|:--------:|:---------:|:---------------:| +|InterfaceError|taosc 版本太低,不支持所使用的接口|请检查 TDengine 客户端版本| +|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数| +|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版| +|OperationalError|操作错误|API 使用错误,请检查代码| +|ProgrammingError||| +|StatementError|stmt 相关异常|| +|ResultError||| +|SchemalessError|schemaless 相关异常|| +|TmqError|tmq 相关异常|| + +Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。 + +Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如: + +```python +{{#include docs/examples/python/handle_exception.py}} +``` + ## 支持的功能 - 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。 @@ -32,7 +62,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con ## 安装 -### 准备 +### 安装前准备 1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。 2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。 @@ -274,7 +304,7 @@ Transfer-Encoding: chunked -## 示例程序 +## 使用示例 ### 基本使用 @@ -343,6 +373,10 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 +#### Connection 类的使用 + +`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。 + ```python {{#include docs/examples/python/connect_websocket_examples.py:basic}} ``` @@ -353,6 +387,46 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 +### 查询数据 + + + + +`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。 + +```python +{{#include docs/examples/python/connection_usage_native_reference.py:query}} +``` + +:::tip +查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。 +::: + + + + + +RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。 + +```python +{{#include docs/examples/python/rest_client_example.py}} +``` + +对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 + + + + + +`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。 + +```python +{{#include docs/examples/python/connect_websocket_examples.py:basic}} +``` + + + + ### 与 req_id 一起使用 使用可选的 req_id 参数,指定请求 id,可以用于 tracing @@ -456,27 +530,169 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ### 数据订阅 -连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅](../../develop/tmq/)。 +连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 -`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API,相关 API 定义请参考 [数据订阅文档](../../develop/tmq/#%E4%B8%BB%E8%A6%81%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84%E5%92%8C-api)。 +`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 + +#### 创建 Consumer + +创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 + +```python +from taos.tmq import Consumer + +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) +``` + +#### 订阅 topics + +Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + +#### 消费数据 + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) +``` + +#### 获取消费进度 + +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 + +```python +assignments = consumer.assignment() +``` + +#### 重置消费进度 + +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) +``` + +#### 结束消费 + +消费结束后,应当取消订阅,并关闭 Consumer。 + +```python +consumer.unsubscribe() +consumer.close() +``` + +#### tmq 订阅示例代码 ```python {{#include docs/examples/python/tmq_example.py}} ``` +#### 获取和重置消费进度示例代码 + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +``` + -除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据。 +除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 + +taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。 + +#### 创建 Consumer + +创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 + +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +``` + +#### 订阅 topics + +Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 + +```python +consumer.subscribe(['topic1', 'topic2']) +``` + +#### 消费数据 + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 + +```python +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) +``` + +#### 获取消费进度 + +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 + +```python +assignments = consumer.assignment() +``` + +#### 重置消费进度 + +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 + +```python +consumer.seek(topic='topic1', partition=0, offset=0) +``` + +#### 结束消费 + +消费结束后,应当取消订阅,并关闭 Consumer。 + +```python +consumer.unsubscribe() +consumer.close() +``` + +#### tmq 订阅示例代码 ```python {{#include docs/examples/python/tmq_websocket_example.py}} ``` +连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 + +#### 获取和重置消费进度示例代码 + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +``` + @@ -530,7 +746,142 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 -### 其它示例程序 +### 通过参数绑定写入数据 + +TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。 + + + + +#### 创建 stmt + +Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 + +``` +import taos + +conn = taos.connect() +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") +``` + +#### 参数绑定 + +调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。 + +``` +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) +``` + +调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。 + +``` +stmt.bind_param_batch(params) +``` + +#### 执行 sql + +调用 stmt 的 `execute` 方法执行 sql + +``` +stmt.execute() +``` + +#### 关闭 stmt + +最后需要关闭 stmt。 + +``` +stmt.close() +``` + +#### 示例代码 + +```python +{{#include docs/examples/python/stmt_example.py}} +``` + + + + +#### 创建 stmt + +Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 + +``` +import taosws + +conn = taosws.connect('taosws://localhost:6041/test') +stmt = conn.statement() +``` + +#### 解析 sql + +调用 stmt 的 `prepare` 方法来解析 insert 语句。 + +``` +stmt.prepare("insert into t1 values (?, ?, ?, ?)") +``` + +#### 参数绑定 + +调用 stmt 的 `bind_param` 方法绑定参数。 + +``` +stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), +]) +``` + +调用 stmt 的 `add_batch` 方法,将参数加入批处理。 + +``` +stmt.add_batch() +``` + +#### 执行 sql + +调用 stmt 的 `execute` 方法执行 sql + +``` +stmt.execute() +``` + +#### 关闭 stmt + +最后需要关闭 stmt。 + +``` +stmt.close() +``` + +#### 示例代码 + +```python +{{#include docs/examples/python/stmt_websocket_example.py}} +``` + + + +### 更多示例程序 | 示例程序链接 | 示例程序内容 | | ------------------------------------------------------------------------------------------------------------- | ----------------------- | @@ -542,14 +893,6 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 ## 其它说明 -### 异常处理 - -所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如: - -```python -{{#include docs/examples/python/handle_exception.py}} -``` -`` ### 关于纳秒 (nanosecond) 由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。