diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index 2a6cd9ecf77febdcc56528f34112944dc25f0aec..f0a59842fecbe783fb2353f62e0ecb2bc59e2d6d 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -87,9 +87,9 @@ TDengine currently supports timestamp, number, character, Boolean type, and the |NCHAR|str| |JSON|str| -## Installation +## Installation Steps -### Preparation +### Pre-installation preparation 1. Install Python. The recent taospy package requires Python 3.6.2+. The earlier versions of taospy require Python 3.7+. The taos-ws-py package requires Python 3.7+. If Python is not available on your system, refer to the [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) to install it. 2. Install [pip](https://pypi.org/project/pip/). In most cases, the Python installer comes with the pip utility. If not, please refer to [pip documentation](https://pip.pypa.io/en/stable/installation/) to install it. @@ -275,7 +275,7 @@ Transfer-Encoding: chunked -### Using connectors to establish connections +### Specify the Host and Properties to get the connection The following example code assumes that TDengine is installed locally and that the default configuration is used for both FQDN and serverPort. @@ -331,7 +331,69 @@ The parameter of `connect()` is the url of TDengine, and the protocol is `taosws -## Example program +### Priority of configuration parameters + +If the configuration parameters are duplicated in the parameters or client configuration file, the priority of the parameters, from highest to lowest, are as follows: + +1. Parameters in `connect` function. +2. the configuration file taos.cfg of the TDengine client driver when using a native connection. + +## Usage examples + +### Create database and tables + + + + +```python +conn = taos.connect() +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +# change database. same as execute "USE db" +conn.select_db("test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosrest.connect(url="http://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosws.connect(url="ws://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + +### Insert data + +```python +conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") +``` + +::: +now is an internal function. The default is the current time of the client's computer. now + 1s represents the current time of the client plus 1 second, followed by the number representing the unit of time: a (milliseconds), s (seconds), m (minutes), h (hours), d (days), w (weeks), n (months), y (years). +::: + ### Basic Usage @@ -453,7 +515,7 @@ The `query` method of the `TaosConnection` class can be used to query data and r -### Usage with req_id +### Execute SQL with reqId By using the optional req_id parameter, you can specify a request ID that can be used for tracing. @@ -553,171 +615,138 @@ As the way to connect introduced above but add `req_id` argument. -### Subscription +### Writing data via parameter binding -Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). +The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound. - + -The `consumer` in the connector contains the subscription api. - -##### Create Consumer - -The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/). +##### Create Stmt -```python -from taos.tmq import Consumer +Call the `statement` method in `Connection` to create the `stmt` for parameter binding. -consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` +import taos -##### Subscribe topics - -The `subscribe` function is used to subscribe to a list of topics. - -```python -consumer.subscribe(['topic1', 'topic2']) +conn = taos.connect() +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") ``` -##### Consume - -The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. +##### parameter binding -```python -while True: - res = consumer.poll(1) - if not res: - continue - err = res.error() - if err is not None: - raise err - val = res.value() +Call the `new_multi_binds` function to create the parameter list for parameter bindings. - for block in val: - print(block.fetchall()) ``` - -##### assignment - -The `assignment` function is used to get the assignment of the topic. - -```python -assignments = consumer.assignment() +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) ``` -##### Seek - -The `seek` function is used to reset the assignment of the topic. +Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values. -```python -tp = TopicPartition(topic='topic1', partition=0, offset=0) -consumer.seek(tp) +``` +stmt.bind_param_batch(params) ``` -##### After consuming data +##### execute sql -You should unsubscribe to the topics and close the consumer after consuming. +Call `execute` method to execute sql. -```python -consumer.unsubscribe() -consumer.close() +``` +stmt.execute() ``` -##### Tmq subscription example +##### Close Stmt -```python -{{#include docs/examples/python/tmq_example.py}} +``` +stmt.close() ``` -##### assignment and seek example +##### Example ```python -{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +{{#include docs/examples/python/stmt_example.py}} ``` - -In addition to native connections, the connector also supports subscriptions via websockets. - -##### Create Consumer +##### Create Stmt -The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). +Call the `statement` method in `Connection` to create the `stmt` for parameter binding. -```python +``` import taosws -consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +conn = taosws.connect('taosws://localhost:6041/test') +stmt = conn.statement() ``` -##### subscribe topics +##### Prepare sql -The `subscribe` function is used to subscribe to a list of topics. +Call `prepare` method in stmt to prepare sql. -```python -consumer.subscribe(['topic1', 'topic2']) ``` - -##### Consume - -The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. - -```python -while True: - res = consumer.poll(timeout=1.0) - if not res: - continue - err = res.error() - if err is not None: - raise err - for block in message: - for row in block: - print(row) +stmt.prepare("insert into t1 values (?, ?, ?, ?)") ``` -##### assignment +##### parameter binding -The `assignment` function is used to get the assignment of the topic. +Call the `bind_param` method to bind parameters. -```python -assignments = consumer.assignment() +``` +stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), +]) ``` -##### Seek - -The `seek` function is used to reset the assignment of the topic. +Call the `add_batch` method to add parameters to the batch. -```python -consumer.seek(topic='topic1', partition=0, offset=0) +``` +stmt.add_batch() ``` -##### After consuming data +##### execute sql -You should unsubscribe to the topics and close the consumer after consuming. +Call `execute` method to execute sql. -```python -consumer.unsubscribe() -consumer.close() +``` +stmt.execute() ``` -##### Subscription example +##### Close Stmt -```python -{{#include docs/examples/python/tmq_websocket_example.py}} +``` +stmt.close() ``` -##### Assignment and seek example +##### Example ```python -{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +{{#include docs/examples/python/stmt_websocket_example.py}} ``` - -### Schemaless Insert +### Schemaless Writing Connector support schemaless insert. @@ -767,134 +796,211 @@ Connector support schemaless insert. -### Parameter Binding +### Schemaless with reqId -The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound. +There is a optional parameter called `req_id` in `schemaless_insert` and `schemaless_insert_raw` method. This reqId can be used to request link tracing. + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + +### Data Subscription + +Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/). + +#### Create a Topic + +To create topic, please refer to [Data Subscription](../../../develop/tmq/#create-a-topic). + +#### Create a Consumer + + - -##### Create Stmt +The consumer in the connector contains the subscription api. The syntax for creating a consumer is consumer = Consumer(configs). For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). -Call the `statement` method in `Connection` to create the `stmt` for parameter binding. +```python +from taos.tmq import Consumer +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` -import taos + -conn = taos.connect() -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") -``` + -##### parameter binding +In addition to native connections, the connector also supports subscriptions via websockets. -Call the `new_multi_binds` function to create the parameter list for parameter bindings. +The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer). -``` -params = new_multi_binds(16) -params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) -params[1].bool((True, None, False)) -params[2].tinyint([-128, -128, None]) # -128 is tinyint null -params[3].tinyint([0, 127, None]) -params[4].smallint([3, None, 2]) -params[5].int([3, 4, None]) -params[6].bigint([3, 4, None]) -params[7].tinyint_unsigned([3, 4, None]) -params[8].smallint_unsigned([3, 4, None]) -params[9].int_unsigned([3, 4, None]) -params[10].bigint_unsigned([3, 4, None]) -params[11].float([3, None, 1]) -params[12].double([3, None, 1.2]) -params[13].binary(["abc", "dddafadfadfadfadfa", None]) -params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) -params[15].timestamp([None, None, 1626861392591]) +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) ``` -Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values. + + -``` -stmt.bind_param_batch(params) -``` +#### Subscribe to a Topic -##### execute sql + -Call `execute` method to execute sql. + -``` -stmt.execute() +The `subscribe` function is used to subscribe to a list of topics. + +```python +consumer.subscribe(['topic1', 'topic2']) ``` -##### Close Stmt + + + +The `subscribe` function is used to subscribe to a list of topics. +```python +consumer.subscribe(['topic1', 'topic2']) ``` -stmt.close() + + + + +#### Consume messages + + + + + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) ``` -##### Example + + + +The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data. ```python -{{#include docs/examples/python/stmt_example.py}} +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) ``` + + - +#### Assignment subscription Offset -##### Create Stmt + -Call the `statement` method in `Connection` to create the `stmt` for parameter binding. + +The `assignment` function is used to get the assignment of the topic. + +```python +assignments = consumer.assignment() ``` -import taosws -conn = taosws.connect('taosws://localhost:6041/test') -stmt = conn.statement() +The `seek` function is used to reset the assignment of the topic. + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) ``` -##### Prepare sql + + -Call `prepare` method in stmt to prepare sql. +The `assignment` function is used to get the assignment of the topic. +```python +assignments = consumer.assignment() ``` -stmt.prepare("insert into t1 values (?, ?, ?, ?)") + +The `seek` function is used to reset the assignment of the topic. + +```python +consumer.seek(topic='topic1', partition=0, offset=0) ``` -##### parameter binding + + -Call the `bind_param` method to bind parameters. +#### Close subscriptions -``` -stmt.bind_param([ - taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), - taosws.ints_to_column([1, 2, 3, 4]), - taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), - taosws.varchar_to_column(['a', 'b', 'c', 'd']), -]) -``` + -Call the `add_batch` method to add parameters to the batch. + -``` -stmt.add_batch() +You should unsubscribe to the topics and close the consumer after consuming. + +```python +consumer.unsubscribe() +consumer.close() ``` -##### execute sql + + -Call `execute` method to execute sql. +You should unsubscribe to the topics and close the consumer after consuming. -``` -stmt.execute() +```python +consumer.unsubscribe() +consumer.close() ``` -##### Close Stmt + + + +#### Full Sample Code + + + + +```python +{{#include docs/examples/python/tmq_example.py}} ``` -stmt.close() + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} ``` -##### Example + + ```python -{{#include docs/examples/python/stmt_websocket_example.py}} +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} ``` + diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index 0b9f2d75a7779feb267f7a6eb4110ee3308957a0..ec1ec4b7c7a0c27f78fd4b4de8ab943f698abe2f 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -70,7 +70,7 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出 {{#include docs/examples/python/handle_exception.py}} ``` -TDengine DataType 和 Python DataType +## TDengine DataType 和 Python DataType TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下: @@ -276,7 +276,7 @@ Transfer-Encoding: chunked -### 使用连接器建立连接 +### 指定 Host 和 Properties 获取连接 以下示例代码假设 TDengine 安装在本机, 且 FQDN 和 serverPort 都使用了默认配置。 @@ -332,8 +332,69 @@ Transfer-Encoding: chunked +### 配置参数的优先级 + +如果配置参数在参数和客户端配置文件中有重复,则参数的优先级由高到低分别如下: + +1. 连接参数 +2. 使用原生连接时,TDengine 客户端驱动的配置文件 taos.cfg + ## 使用示例 +### 创建数据库和表 + + + + +```python +conn = taos.connect() +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +# change database. same as execute "USE db" +conn.select_db("test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosrest.connect(url="http://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + + +```python +conn = taosws.connect(url="ws://localhost:6041") +# Execute a sql, ignore the result set, just get affected rows. It's useful for DDL and DML statement. +conn.execute("DROP DATABASE IF EXISTS test") +conn.execute("CREATE DATABASE test") +conn.execute("USE test") +conn.execute("CREATE STABLE weather(ts TIMESTAMP, temperature FLOAT) TAGS (location INT)") +``` + + + + +### 插入数据 + +```python +conn.execute("INSERT INTO t1 USING weather TAGS(1) VALUES (now, 23.5) (now+1m, 23.5) (now+2m, 24.4)") +``` + +::: +now 为系统内部函数,默认为客户端所在计算机当前时间。 now + 1s 代表客户端当前时间往后加 1 秒,数字后面代表时间单位:a(毫秒),s(秒),m(分),h(小时),d(天),w(周),n(月),y(年)。 +::: + ### 基本使用 @@ -372,7 +433,6 @@ Transfer-Encoding: chunked :::note TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。 - ::: @@ -455,7 +515,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 -### 与 req_id 一起使用 +### 执行带有 reqId 的 SQL 使用可选的 req_id 参数,指定请求 id,可以用于 tracing @@ -556,171 +616,138 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方 -### 数据订阅 +### 通过参数绑定写入数据 -连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 +TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。 - + -`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。 - -##### 创建 Consumer - -创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 +##### 创建 stmt -```python -from taos.tmq import Consumer +Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 -consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` +import taos -##### 订阅 topics - -Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 - -```python -consumer.subscribe(['topic1', 'topic2']) +conn = taos.connect() +stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") ``` -##### 消费数据 - -Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 +##### 参数绑定 -```python -while True: - res = consumer.poll(1) - if not res: - continue - err = res.error() - if err is not None: - raise err - val = res.value() +调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。 - for block in val: - print(block.fetchall()) +``` +params = new_multi_binds(16) +params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) +params[1].bool((True, None, False)) +params[2].tinyint([-128, -128, None]) # -128 is tinyint null +params[3].tinyint([0, 127, None]) +params[4].smallint([3, None, 2]) +params[5].int([3, 4, None]) +params[6].bigint([3, 4, None]) +params[7].tinyint_unsigned([3, 4, None]) +params[8].smallint_unsigned([3, 4, None]) +params[9].int_unsigned([3, 4, None]) +params[10].bigint_unsigned([3, 4, None]) +params[11].float([3, None, 1]) +params[12].double([3, None, 1.2]) +params[13].binary(["abc", "dddafadfadfadfadfa", None]) +params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) +params[15].timestamp([None, None, 1626861392591]) ``` -##### 获取消费进度 - -Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 +调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。 -```python -assignments = consumer.assignment() +``` +stmt.bind_param_batch(params) ``` -##### 指定订阅 Offset +##### 执行 sql -Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 +调用 stmt 的 `execute` 方法执行 sql -```python -tp = TopicPartition(topic='topic1', partition=0, offset=0) -consumer.seek(tp) +``` +stmt.execute() ``` -##### 关闭订阅 +##### 关闭 stmt -消费结束后,应当取消订阅,并关闭 Consumer。 +最后需要关闭 stmt。 -```python -consumer.unsubscribe() -consumer.close() ``` - -##### 完整示例 - -```python -{{#include docs/examples/python/tmq_example.py}} +stmt.close() ``` -##### 获取和重置消费进度示例代码 +##### 示例代码 ```python -{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} +{{#include docs/examples/python/stmt_example.py}} ``` - -除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 - -taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。 - -##### 创建 Consumer +##### 创建 stmt -创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 +Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 -```python +``` import taosws -consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) +conn = taosws.connect('taosws://localhost:6041/test') +stmt = conn.statement() ``` -##### 订阅 topics +##### 解析 sql -Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 +调用 stmt 的 `prepare` 方法来解析 insert 语句。 -```python -consumer.subscribe(['topic1', 'topic2']) +``` +stmt.prepare("insert into t1 values (?, ?, ?, ?)") ``` -##### 消费数据 +##### 参数绑定 -Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 +调用 stmt 的 `bind_param` 方法绑定参数。 -```python -while True: - res = consumer.poll(timeout=1.0) - if not res: - continue - err = res.error() - if err is not None: - raise err - for block in message: - for row in block: - print(row) +``` +stmt.bind_param([ + taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), + taosws.ints_to_column([1, 2, 3, 4]), + taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), + taosws.varchar_to_column(['a', 'b', 'c', 'd']), +]) ``` -##### 获取消费进度 - -Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 +调用 stmt 的 `add_batch` 方法,将参数加入批处理。 -```python -assignments = consumer.assignment() +``` +stmt.add_batch() ``` -##### 重置消费进度 +##### 执行 sql -Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 +调用 stmt 的 `execute` 方法执行 sql -```python -consumer.seek(topic='topic1', partition=0, offset=0) +``` +stmt.execute() ``` -##### 结束消费 +##### 关闭 stmt -消费结束后,应当取消订阅,并关闭 Consumer。 +最后需要关闭 stmt。 -```python -consumer.unsubscribe() -consumer.close() ``` - -##### tmq 订阅示例代码 - -```python -{{#include docs/examples/python/tmq_websocket_example.py}} +stmt.close() ``` -连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。 - -##### 获取和重置消费进度示例代码 +##### 示例代码 ```python -{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} +{{#include docs/examples/python/stmt_websocket_example.py}} ``` - @@ -774,138 +801,211 @@ consumer.close() -### 通过参数绑定写入数据 +### 执行带有 reqId 的无模式写入 -TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。 +连接器的 `schemaless_insert` 和 `schemaless_insert_raw` 方法支持 `req_id` 可选参数,此 `req_Id` 可用于请求链路追踪。 + +```python +{{#include docs/examples/python/schemaless_insert_req_id.py}} +``` + +```python +{{#include docs/examples/python/schemaless_insert_raw_req_id.py}} +``` + +### 数据订阅 + +连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。 + +#### 创建 Topic + +创建 Topic 相关请参考 [数据订阅文档](../../develop/tmq/#创建-topic)。 + +#### 创建 Consumer + + - -##### 创建 stmt +`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#创建消费者-consumer)。 -Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 +```python +from taos.tmq import Consumer +consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"}) ``` -import taos + -conn = taos.connect() -stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)") -``` + -##### 参数绑定 +除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。 -调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。 +taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。 -``` -params = new_multi_binds(16) -params[0].timestamp((1626861392589, 1626861392590, 1626861392591)) -params[1].bool((True, None, False)) -params[2].tinyint([-128, -128, None]) # -128 is tinyint null -params[3].tinyint([0, 127, None]) -params[4].smallint([3, None, 2]) -params[5].int([3, 4, None]) -params[6].bigint([3, 4, None]) -params[7].tinyint_unsigned([3, 4, None]) -params[8].smallint_unsigned([3, 4, None]) -params[9].int_unsigned([3, 4, None]) -params[10].bigint_unsigned([3, 4, None]) -params[11].float([3, None, 1]) -params[12].double([3, None, 1.2]) -params[13].binary(["abc", "dddafadfadfadfadfa", None]) -params[14].nchar(["涛思数据", None, "a long string with 中文字符"]) -params[15].timestamp([None, None, 1626861392591]) +```python +import taosws + +consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"}) ``` -调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。 + + -``` -stmt.bind_param_batch(params) -``` +#### 订阅 topics -##### 执行 sql + -调用 stmt 的 `execute` 方法执行 sql + -``` -stmt.execute() +Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 + +```python +consumer.subscribe(['topic1', 'topic2']) ``` -##### 关闭 stmt + + -最后需要关闭 stmt。 +Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。 +```python +consumer.subscribe(['topic1', 'topic2']) ``` -stmt.close() + + + + +#### 消费数据 + + + + + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 + +```python +while True: + res = consumer.poll(1) + if not res: + continue + err = res.error() + if err is not None: + raise err + val = res.value() + + for block in val: + print(block.fetchall()) ``` -##### 示例代码 + + + +Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。 ```python -{{#include docs/examples/python/stmt_example.py}} +while True: + res = consumer.poll(timeout=1.0) + if not res: + continue + err = res.error() + if err is not None: + raise err + for block in message: + for row in block: + print(row) ``` + + - +#### 获取消费进度 -##### 创建 stmt + -Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。 + + +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 +```python +assignments = consumer.assignment() ``` -import taosws -conn = taosws.connect('taosws://localhost:6041/test') -stmt = conn.statement() +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。 + +```python +tp = TopicPartition(topic='topic1', partition=0, offset=0) +consumer.seek(tp) ``` -##### 解析 sql + + -调用 stmt 的 `prepare` 方法来解析 insert 语句。 +Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。 +```python +assignments = consumer.assignment() ``` -stmt.prepare("insert into t1 values (?, ?, ?, ?)") + +Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。 + +```python +consumer.seek(topic='topic1', partition=0, offset=0) ``` -##### 参数绑定 + + -调用 stmt 的 `bind_param` 方法绑定参数。 +#### 关闭订阅 -``` -stmt.bind_param([ - taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]), - taosws.ints_to_column([1, 2, 3, 4]), - taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]), - taosws.varchar_to_column(['a', 'b', 'c', 'd']), -]) -``` + -调用 stmt 的 `add_batch` 方法,将参数加入批处理。 + -``` -stmt.add_batch() +消费结束后,应当取消订阅,并关闭 Consumer。 + +```python +consumer.unsubscribe() +consumer.close() ``` -##### 执行 sql + + -调用 stmt 的 `execute` 方法执行 sql +消费结束后,应当取消订阅,并关闭 Consumer。 -``` -stmt.execute() +```python +consumer.unsubscribe() +consumer.close() ``` -##### 关闭 stmt + + -最后需要关闭 stmt。 +#### 完整示例 + + + + +```python +{{#include docs/examples/python/tmq_example.py}} ``` -stmt.close() + +```python +{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}} ``` -##### 示例代码 + + ```python -{{#include docs/examples/python/stmt_websocket_example.py}} +{{#include docs/examples/python/tmq_websocket_example.py}} +``` + +```python +{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}} ``` + diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b48992b5f22914455c5a41f8bf46eb7c0b27f5cb..34a0bc86576d2d9d27d26c15f8da9730d1e4bd35 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -346,6 +346,7 @@ struct SStreamTask { int32_t refCnt; int64_t checkpointingId; int32_t checkpointAlignCnt; + int32_t transferStateAlignCnt; struct SStreamMeta* pMeta; SSHashObj* pNameMap; }; @@ -630,6 +631,8 @@ int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStre int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); +int32_t streamAlignTransferState(SStreamTask* pTask); + #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 68b697ca67c576eb9611b6618290cf0e07f570d8..6f323b2b427b9187198a227b07500ec9e4f87a18 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -889,11 +889,11 @@ _OVER: } int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) { - SSdb *pSdb = pMnode->pSdb; - SSmaObj *pSma = NULL; - void *pIter = NULL; - SVgObj *pVgroup = NULL; - int32_t code = -1; + SSdb *pSdb = pMnode->pSdb; + SSmaObj *pSma = NULL; + void *pIter = NULL; + SVgObj *pVgroup = NULL; + int32_t code = -1; while (1) { pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); @@ -911,12 +911,18 @@ int32_t mndDropSmasByStb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *p if (pStream != NULL && pStream->smaId == pSma->uid) { if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to drop task since %s", pStream->name, terrstr()); + mndReleaseStream(pMnode, pStream); goto _OVER; } + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + mndReleaseStream(pMnode, pStream); goto _OVER; } + + mndReleaseStream(pMnode, pStream); } + if (mndSetDropSmaVgroupCommitLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER; if (mndSetDropSmaVgroupRedoActions(pMnode, pTrans, pDb, pVgroup) != 0) goto _OVER; if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c8e8dc925d1b60a00e1b247343c77f5be67b58ef..25648d61faa1b72d51a0da5b511d4f58e7d9f40f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -248,7 +248,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); +int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 20061911bc4daa676c24fa26b868e4920b447d12..41e0a97d7986fbe749b5f0c2bec0c027be72f4c0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -934,7 +934,6 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -1106,7 +1105,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { // 1. stop the related stream task, get the current scan wal version of stream task, ver. pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo handle error + qError("failed to find s-task:0x%x, it may have been destroyed, drop fill history task:%s", + pTask->streamTaskId.taskId, pTask->id.idStr); + + pTask->status.taskStatus = TASK_STATUS__DROPPING; + tqDebug("s-task:%s scan-history-task set status to be dropping", pId); + + streamMetaSaveTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pTask); + return -1; } ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -1213,11 +1220,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // notify the downstream tasks to transfer executor state after handle all history blocks. -int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - SStreamTransferReq req; +int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { + char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + + SStreamTransferReq req = {0}; SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)msg, msgLen); + tDecoderInit(&decoder, (uint8_t*)pReq, len); int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); tDecoderClear(&decoder); @@ -1227,25 +1237,33 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int return -1; } + int32_t remain = streamAlignTransferState(pTask); + if (remain > 0) { + tqDebug("s-task:%s receive transfer state msg, remain:%d", pTask->id.idStr, remain); + return 0; + } + // transfer the ownership of executor state - streamTaskReleaseState(pTask); - tqDebug("s-task:%s receive state transfer req", pTask->id.idStr); + tqDebug("s-task:%s all upstream tasks end transfer msg", pTask->id.idStr); // related stream task load the state from the state storage backend SStreamTask* pStreamTask = streamMetaAcquireTask(pTq->pStreamMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { + streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqError("failed to find related stream task:0x%x, it may have been dropped already", req.taskId); return -1; } + // when all upstream tasks have notified the this task to start transfer state, then we start the transfer procedure. + streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); + streamMetaReleaseTask(pTq->pStreamMeta, pStreamTask); ASSERT(pTask->streamTaskId.taskId != 0); pTask->status.transferState = true; streamSchedExec(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 88a260b3a37f706e8a09820aa6ab56cc56780a8b..d4efa38c3e364030171c767d6c45f60f00de7386 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -659,11 +659,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TRANSFER_STATE: { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len); - } + case TDMT_STREAM_TRANSFER_STATE: + return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: return tqProcessStreamTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 89bfcb0e0ada72e90aa2132c503ed7b0d4208633..d7f0702cb65c7ad1fe1ab181dcdfbdaf2a571e57 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -286,20 +286,24 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch hashPrefix = pCfg->hashPrefix + dbFNameLen + 1; } - len += sprintf( - buf2 + VARSTR_HEADER_SIZE, - "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " - "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " - "WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d " - "WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64, - dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, - pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, - pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, - 1 == pCfg->numOfStables, hashPrefix, pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize); - - if (retentions) { - len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); - taosMemoryFree(retentions); + if (IS_SYS_DBNAME(dbName)) { + len += sprintf(buf2 + VARSTR_HEADER_SIZE, "CREATE DATABASE `%s`", dbName); + } else { + len += sprintf( + buf2 + VARSTR_HEADER_SIZE, + "CREATE DATABASE `%s` BUFFER %d CACHESIZE %d CACHEMODEL '%s' COMP %d DURATION %dm " + "WAL_FSYNC_PERIOD %d MAXROWS %d MINROWS %d STT_TRIGGER %d KEEP %dm,%dm,%dm PAGES %d PAGESIZE %d PRECISION '%s' REPLICA %d " + "WAL_LEVEL %d VGROUPS %d SINGLE_STABLE %d TABLE_PREFIX %d TABLE_SUFFIX %d TSDB_PAGESIZE %d " + "WAL_RETENTION_PERIOD %d WAL_RETENTION_SIZE %" PRId64, + dbName, pCfg->buffer, pCfg->cacheSize, cacheModelStr(pCfg->cacheLast), pCfg->compression, pCfg->daysPerFile, + pCfg->walFsyncPeriod, pCfg->maxRows, pCfg->minRows, pCfg->sstTrigger, pCfg->daysToKeep0, pCfg->daysToKeep1, pCfg->daysToKeep2, + pCfg->pages, pCfg->pageSize, prec, pCfg->replications, pCfg->walLevel, pCfg->numOfVgroups, + 1 == pCfg->numOfStables, hashPrefix, pCfg->hashSuffix, pCfg->tsdbPageSize, pCfg->walRetentionPeriod, pCfg->walRetentionSize); + + if (retentions) { + len += sprintf(buf2 + VARSTR_HEADER_SIZE + len, " RETENTIONS %s", retentions); + taosMemoryFree(retentions); + } } (varDataLen(buf2)) = len; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 900505acb32fd5256a9481b96c9d800387d37138..06b90d0a516729f77a20c008c81aa828285e557e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -288,9 +288,8 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } - struct SSubplan* pPlan = NULL; - - int32_t code = qStringToSubplan(msg, &pPlan); + SSubplan* pPlan = NULL; + int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -335,6 +334,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 6f7f052158c8b116125b1965c1ed8734b2c75992..ebf50f4784da8bf57162852c75d5a34397cb4d9b 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -509,6 +509,10 @@ static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int } static int32_t getDBCfg(STranslateContext* pCxt, const char* pDbName, SDbCfgInfo* pInfo) { + if (IS_SYS_DBNAME(pDbName)) { + return TSDB_CODE_SUCCESS; + } + SParseContext* pParCxt = pCxt->pParseCxt; SName name; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pDbName, strlen(pDbName)); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ddbc8da3ecc8cc920157f3a8a356850a5c917da0..07d7cb30407b569f45cd21f55c65fbcdc677db71 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -61,7 +61,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -void streamSchedByTimer(void* param, void* tmrId) { +static void streamSchedByTimer(void* param, void* tmrId) { SStreamTask* pTask = (void*)param; int8_t status = atomic_load_8(&pTask->triggerStatus); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d0d63215e613e945457fd6cafd546a8398aaeafd..9adae2a2f5c95745ea898b2eec581d9f499dbfb0 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -352,11 +352,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", + qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } else { - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + qDebug("s-task:%s fill-history task end, update related stream task:%s info, transfer exec state", pTask->id.idStr, + pStreamTask->id.idStr); } ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); @@ -369,6 +370,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { } else { ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__NORMAL); pStreamTask->status.taskStatus = TASK_STATUS__HALT; + qDebug("s-task:%s status: halt by related fill history task:%s", pStreamTask->id.idStr, pTask->id.idStr); } // wait for the stream task to be idle @@ -477,6 +479,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); + pTask->status.transferState = false; // reset this value, to avoid transfer state again if (code != TSDB_CODE_SUCCESS) { // todo handle this return 0; } @@ -611,3 +614,13 @@ int32_t streamTaskReloadState(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } } + +int32_t streamAlignTransferState(SStreamTask* pTask) { + int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList); + int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream); + if (old == 0) { + qDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream); + } + + return atomic_sub_fetch_32(&pTask->transferStateAlignCnt, 1); +} diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e1f625dd52535fcbf4116ea6adcfd06b3625fa09..a2b5d0e396bb6452843db4f290f91bbcdda73aa3 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -264,8 +264,9 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { - atomic_add_fetch_32(&(*ppTask)->refCnt, 1); + int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); + qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } @@ -275,12 +276,24 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { } void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { - int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); - if (left < 0) { - qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr); - } else if (left == 0) { + int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); + if (ref > 0) { + qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); + } else if (ref < 0) { + qError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); + } +} + +static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, int32_t taskId) { + for (int32_t i = 0; i < num; ++i) { + int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); + if (*pTaskId == taskId) { + taosArrayRemove(pMeta->pTaskList, i); + break; + } } } @@ -333,17 +346,17 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t num = taosArrayGetSize(pMeta->pTaskList); qDebug("s-task:%s set the drop task flag, remain running s-task:%d", pTask->id.idStr, num - 1); - for (int32_t i = 0; i < num; ++i) { - int32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i); - if (*pTaskId == taskId) { - taosArrayRemove(pMeta->pTaskList, i); - break; - } + doRemoveIdFromList(pMeta, num, pTask->id.taskId); + + // remove the ref by timer + if (pTask->triggerParam != 0) { + taosTmrStop(pTask->schedTimer); + streamMetaReleaseTask(pMeta, pTask); } streamMetaReleaseTask(pMeta, pTask); } else { - qDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); + qDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); } taosWUnLockLatch(&pMeta->lock); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index a3fc3418aa8f78c14e18ea26184e0f5a14d5bb72..0f2281ea735bfd647bd3eeb0210097f00273dcb2 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -43,6 +43,7 @@ const char* streamGetTaskStatusStr(int32_t status) { case TASK_STATUS__SCAN_HISTORY: return "scan-history"; case TASK_STATUS__HALT: return "halt"; case TASK_STATUS__PAUSE: return "paused"; + case TASK_STATUS__DROPPING: return "dropping"; default:return ""; } } @@ -205,7 +206,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage, status:%s", id, numOfReqs, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskLaunchScanHistory(pTask); - } else { // todo add assert, agg tasks? + } else { ASSERT(pTask->status.taskStatus == TASK_STATUS__NORMAL); qDebug("s-task:%s fixed downstream task is ready, now ready for data from wal, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -258,9 +259,15 @@ int32_t streamRestoreParam(SStreamTask* pTask) { } int32_t streamSetStatusNormal(SStreamTask* pTask) { - qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); - return 0; + int32_t status = atomic_load_8(&pTask->status.taskStatus); + if (status == TASK_STATUS__DROPPING) { + qError("s-task:%s cannot be set normal, since in dropping state", pTask->id.idStr); + return -1; + } else { + qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); + return 0; + } } // source @@ -344,7 +351,8 @@ static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferRe msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId); + qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, + pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->taskId, vgId); return 0; } @@ -354,9 +362,6 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus)); - req.taskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -451,6 +456,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { const char* pStatus = streamGetTaskStatusStr((*ppTask)->status.taskStatus); qDebug("s-task:%s status:%s quit timer task", (*ppTask)->id.idStr, pStatus); + taosMemoryFree(pInfo); (*ppTask)->status.timerActive = 0; taosWUnLockLatch(&pMeta->lock); return; @@ -511,6 +517,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask) { pTask->launchTaskTimer = taosTmrStart(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer); if (pTask->launchTaskTimer == NULL) { // todo failed to create timer + taosMemoryFree(pInfo); } else { pTask->status.timerActive = 1; // timer is active qDebug("s-task:%s set timer active flag", pTask->id.idStr); @@ -553,8 +560,10 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { streamSetStatusNormal(pTask); atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - // todo check rsp, commit data + taosWLockLatch(&pMeta->lock); streamMetaSaveTask(pMeta, pTask); + taosWUnLockLatch(&pMeta->lock); + return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 06da72188c4b3252d32bb50d5c2dc61c9f14d2bc..ef83583ea4dd19f70599d9c0b45a00bcb0cf94ae 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -205,13 +205,16 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { void tFreeStreamTask(SStreamTask* pTask) { qDebug("free s-task:%s", pTask->id.idStr); + int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus)); if (pTask->inputQueue) { streamQueueClose(pTask->inputQueue); } + if (pTask->outputQueue) { streamQueueClose(pTask->outputQueue); } + if (pTask->exec.qmsg) { taosMemoryFree(pTask->exec.qmsg); } @@ -230,9 +233,7 @@ void tFreeStreamTask(SStreamTask* pTask) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); tSimpleHashCleanup(pTask->tbSink.pTblInfo); - } - - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; diff --git a/tests/system-test/0-others/show.py b/tests/system-test/0-others/show.py index 4d40d052c06f86e32e7f3d6d8a5a3cdff35c0dd7..50a1662ba013d4e4dd52ffbf4bdfb9f47f7c92d7 100644 --- a/tests/system-test/0-others/show.py +++ b/tests/system-test/0-others/show.py @@ -81,12 +81,20 @@ class TDTestCase: tag_sql += f"{k} {v}, " create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})' return create_stb_sql - + def set_create_database_sql(self,sql_dict): create_sql = 'create' for key,value in sql_dict.items(): create_sql += f' {key} {value}' return create_sql + + def show_create_sysdb_sql(self): + sysdb_list = {'information_schema', 'performance_schema'} + for db in sysdb_list: + tdSql.query(f'show create database {db}') + tdSql.checkEqual(f'{db}',tdSql.queryResult[0][0]) + tdSql.checkEqual(f'CREATE DATABASE `{db}`',tdSql.queryResult[0][1]) + def show_create_sql(self): create_db_sql = self.set_create_database_sql(self.db_param) print(create_db_sql) @@ -106,7 +114,7 @@ class TDTestCase: tdSql.query('show vnodes 1') tdSql.checkRows(self.vgroups) tdSql.execute(f'use {self.dbname}') - + column_dict = { '`ts`': 'timestamp', '`col1`': 'tinyint', @@ -122,7 +130,7 @@ class TDTestCase: '`col11`': 'bool', '`col12`': 'varchar(20)', '`col13`': 'nchar(20)' - + } tag_dict = { '`t1`': 'tinyint', @@ -139,7 +147,7 @@ class TDTestCase: '`t12`': 'varchar(20)', '`t13`': 'nchar(20)', '`t14`': 'timestamp' - + } create_table_sql = self.set_stb_sql(self.stbname,column_dict,tag_dict) tdSql.execute(create_table_sql) @@ -150,7 +158,7 @@ class TDTestCase: tag_sql = '(' for tag_keys in tag_dict.keys(): tag_sql += f'{tag_keys}, ' - tags = f'{tag_sql[:-2]})' + tags = f'{tag_sql[:-2]})' sql = f'create table {self.tbname} using {self.stbname} {tags} tags (1, 1, 1, 1, 1, 1, 1, 1, 1.000000e+00, 1.000000e+00, true, "abc", "abc123", 0)' tdSql.query(f'show create table {self.tbname}') query_result = tdSql.queryResult @@ -173,7 +181,7 @@ class TDTestCase: taosd_info = os.popen('taosd -V').read() taosd_gitinfo = re.findall("^gitinfo.*",taosd_info,re.M) tdSql.checkEqual(taosd_gitinfo_sql,taosd_gitinfo[0]) - + def show_base(self): for sql in ['dnodes','mnodes','cluster']: tdSql.query(f'show {sql}') @@ -191,6 +199,7 @@ class TDTestCase: self.ins_check() self.perf_check() self.show_create_sql() + self.show_create_sysdb_sql() def stop(self): tdSql.close()