提交 561c61ba 编写于 作者: S sunpeng

docs: add docs for assignment and seek

docs: upgrade docs from python subscription
docs: support stmt via ws
docs: optimizate python document base on java document
上级 9455087d
......@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0):
pass
def assignment(self):
pass
def poll(self, timeout: float = 1.0):
pass
def close(self):
pass
......
......@@ -24,6 +24,36 @@ The source code for the Python connector is hosted on [GitHub](https://github.co
We recommend using the latest version of `taospy`, regardless of the version of TDengine.
## Handling Exceptions
There are 4 types of exception in python connector.
- The exception of Python Connector itself.
- The exception of native library.
- The exception of websocket
- The exception of subscription.
- The exception of other TDengine function modules.
|Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:|
|InterfaceError|the native library is too old that it cannot support the function|please check the TDengine client version|
|ConnectionError|connection error|please check TDengine's status and the connection params|
|DatabaseError|database error|please upgrade Python connector to latest|
|OperationalError|operation error||
|ProgrammingError|||
|StatementError|the exception of stmt||
|ResultError|||
|SchemalessError|the exception of stmt schemaless||
|TmqError|the exception of stmt tmq||
It usually uses try-expect to handle exceptions in python. For exception handling, please refer to [Python Errors and Exceptions Documentation](https://docs.python.org/3/tutorial/errors.html).
All exceptions from the Python Connector are thrown directly. Applications should handle these exceptions. For example:
```python
{{#include docs/examples/python/handle_exception.py}}
```
## Supported features
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
......@@ -343,6 +373,8 @@ For a more detailed description of the `sql()` method, please refer to [RestClie
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `Connection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
......@@ -353,6 +385,46 @@ For a more detailed description of the `sql()` method, please refer to [RestClie
</TabItem>
</Tabs>
### Querying Data
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`.
```python
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
```
:::tip
The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
:::
</TabItem>
<TabItem value="rest" label="REST connection">
The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result.
```python
{{#include docs/examples/python/rest_client_example.py}}
```
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`.
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
</TabItem>
</Tabs>
### Usage with req_id
By using the optional req_id parameter, you can specify a request ID that can be used for tracing.
......@@ -453,6 +525,170 @@ As the way to connect introduced above but add `req_id` argument.
</TabItem>
</Tabs>
### Subscription
Connector support data subscription. For more information about subscroption, please refer to [Data Subscription](../../../develop/tmq/).
<Tabs defaultValue="native">
<TabItem value="native" label="native connection">
The `consumer` in the connector contains the subscription api.
#### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
```
#### assignment
The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
```
#### Seek
The `seek` function is used to reset the assignment of the topic.
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
```python
consumer.unsubscribe()
consumer.close()
```
#### Tmq subscription example
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### assignment and seek example
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
```python
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
```python
while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
```
#### assignment
The `assignment` function is used to get the assignment of the topic.
```python
assignments = consumer.assignment()
```
#### Seek
The `seek` function is used to reset the assignment of the topic.
```python
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
```python
consumer.unsubscribe()
consumer.close()
```
#### Subscription example
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
```
#### Assignment and seek example
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
```
</TabItem>
</Tabs>
### Schemaless Insert
Connector support schemaless insert.
......@@ -503,11 +739,143 @@ Insert with req_id argument
</TabItem>
</Tabs>
### Parameter Binding
The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound.
<Tabs>
<TabItem value="native" label="native connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values.
```
stmt.bind_param_batch(params)
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### Prepare sql
Call `prepare` method in stmt to prepare sql.
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### parameter binding
Call the `bind_param` method to bind parameters.
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
Call the `add_batch` method to add parameters to the batch.
```
stmt.add_batch()
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### Other sample programs
| Example program links | Example program content |
| ------------------------------------------------------------------------------------------------------------- | ------------------- ---- |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding, bind multiple rows at once |
| [bind_multi.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-multi.py) | parameter binding,
bind multiple rows at once |
| [bind_row.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/bind-row.py) | bind_row.py
| [insert_lines.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/insert-lines.py) | InfluxDB line protocol writing |
| [json_tag.py](https://github.com/taosdata/taos-connector-python/blob/main/examples/json-tag.py) | Use JSON type tags |
......@@ -515,14 +883,6 @@ Insert with req_id argument
## Other notes
### Exception handling
All errors from database operations are thrown directly as exceptions and the error message from the database is passed up the exception stack. The application is responsible for exception handling. For example:
```python
{{#include docs/examples/python/handle_exception.py}}
```
### About nanoseconds
Due to the current imperfection of Python's nanosecond support (see link below), the current implementation returns integers at nanosecond precision instead of the `datetime` type produced by `ms` and `us`, which application developers will need to handle on their own. And it is recommended to use pandas' to_datetime(). The Python Connector may modify the interface in the future if Python officially supports nanoseconds in full.
......
#!
import taosws
import taos
db_name = 'test_ws_stmt'
def before():
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))")
taos_conn.close()
def stmt_insert():
before()
conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name)
while True:
try:
stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e
break
def stmt_insert_into_stable():
before()
conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name)
while True:
try:
stmt = conn.statement()
stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)")
stmt.set_tbname('stb1_1')
stmt.set_tags([
taosws.int_to_tag(1),
taosws.varchar_to_tag('aaa'),
])
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e
break
#!
import time
import taosws
import taos
def before_test(db_name):
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))")
taos_conn.close()
def after_test(db_name):
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.close()
def stmt_insert():
db_name = 'test_ws_stmt_{}'.format(int(time.time()))
before_test(db_name)
conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name)
stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
assert rows == 4
stmt.close()
after_test(db_name)
def stmt_insert_into_stable():
db_name = 'test_ws_stmt_{}'.format(int(time.time()))
before_test(db_name)
conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name)
stmt = conn.statement()
stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)")
stmt.set_tbname('stb1_1')
stmt.set_tags([
taosws.int_to_tag(1),
taosws.varchar_to_tag('aaa'),
])
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
assert rows == 4
stmt.close()
after_test(db_name)
if __name__ == '__main__':
stmt_insert()
stmt_insert_into_stable()
import taos
from taos.tmq import Consumer
import taosws
def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
conn.select_db("tmq_assignment_demo_db")
conn.execute(
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute(
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
def taos_get_assignment_and_seek_demo():
prepare()
consumer = Consumer(
{
"group.id": "0",
# should disable snapshot,
# otherwise it will cause invalid params error
"experimental.snapshot.enable": "false",
}
)
consumer.subscribe(["tmq_assignment_demo_topic"])
# get topic assignment
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
# poll
consumer.poll(1)
consumer.poll(1)
# get topic assignment again
after_pool_assignments = consumer.assignment()
for assignment in after_pool_assignments:
print(assignment)
# seek to the beginning
for assignment in assignments:
consumer.seek(assignment)
# now the assignment should be the same as before poll
assignments = consumer.assignment()
for assignment in assignments:
print(assignment)
if __name__ == '__main__':
taosws_get_assignment_and_seek_demo()
import taos
import taosws
def prepare():
conn = taos.connect()
conn.execute("drop topic if exists tmq_assignment_demo_topic")
conn.execute("drop database if exists tmq_assignment_demo_db")
conn.execute("create database if not exists tmq_assignment_demo_db wal_retention_period 3600")
conn.select_db("tmq_assignment_demo_db")
conn.execute(
"create table if not exists tmq_assignment_demo_table (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)")
conn.execute(
"create topic if not exists tmq_assignment_demo_topic as select ts, c1, c2, c3 from tmq_assignment_demo_table")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-2s, 1, 1.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now-1s, 2, 2.0, 'tmq test')")
conn.execute("insert into d0 using tmq_assignment_demo_table tags (0) values (now, 3, 3.0, 'tmq test')")
def taosws_get_assignment_and_seek_demo():
prepare()
consumer = taosws.Consumer(conf={
"td.connect.websocket.scheme": "ws",
# should disable snapshot,
# otherwise it will cause invalid params error
"experimental.snapshot.enable": "false",
"group.id": "0",
})
consumer.subscribe(["tmq_assignment_demo_topic"])
# get topic assignment
assignments = consumer.assignment()
for assignment in assignments:
print(assignment.to_string())
# poll
consumer.poll(1)
consumer.poll(1)
# get topic assignment again
after_poll_assignments = consumer.assignment()
for assignment in after_poll_assignments:
print(assignment.to_string())
# seek to the beginning
for assignment in assignments:
for a in assignment.assignments():
consumer.seek(assignment.topic(), a.vg_id(), a.offset())
# now the assignment should be the same as before poll
assignments = consumer.assignment()
for assignment in assignments:
print(assignment.to_string())
if __name__ == '__main__':
taosws_get_assignment_and_seek_demo()
......@@ -105,6 +105,12 @@ class Consumer:
def poll(self, timeout: float = 1.0):
pass
def assignment(self):
pass
def seek(self, partition):
pass
def close(self):
pass
......
......@@ -25,6 +25,36 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。
## 处理异常
Python 连接器可能会产生 4 种异常:
- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine 其他功能模块的异常
|Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:|
|InterfaceError|taosc 版本太低,不支持所使用的接口|请检查 TDengine 客户端版本|
|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数|
|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版|
|OperationalError|操作错误|API 使用错误,请检查代码|
|ProgrammingError|||
|StatementError|stmt 相关异常||
|ResultError|||
|SchemalessError|schemaless 相关异常||
|TmqError|tmq 相关异常||
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
```python
{{#include docs/examples/python/handle_exception.py}}
```
## 支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
......@@ -32,7 +62,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
## 安装
### 准备
### 安装前准备
1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
......@@ -274,7 +304,7 @@ Transfer-Encoding: chunked
</TabItem>
</Tabs>
## 示例程序
## 使用示例
### 基本使用
......@@ -343,6 +373,10 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### Connection 类的使用
`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
......@@ -353,6 +387,46 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
</Tabs>
### 查询数据
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
```
:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。
```python
{{#include docs/examples/python/rest_client_example.py}}
```
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
</TabItem>
</Tabs>
### 与 req_id 一起使用
使用可选的 req_id 参数,指定请求 id,可以用于 tracing
......@@ -456,27 +530,169 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
### 数据订阅
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅](../../develop/tmq/)。
连接器支持数据订阅功能,数据订阅功能请参考 [数据订阅文档](../../develop/tmq/)。
<Tabs defaultValue="native">
<TabItem value="native" label="原生连接">
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API,相关 API 定义请参考 [数据订阅文档](../../develop/tmq/#%E4%B8%BB%E8%A6%81%E6%95%B0%E6%8D%AE%E7%BB%93%E6%9E%84%E5%92%8C-api)。
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
```python
from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python
while True:
res = consumer.poll(1)
if not res:
continue
err = res.error()
if err is not None:
raise err
val = res.value()
for block in val:
print(block.fetchall())
```
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
assignments = consumer.assignment()
```
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
```python
tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
```python
consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据。
除了原生的连接方式,Python 连接器还支持通过 websocket 订阅 TMQ 数据,使用 websocket 方式订阅 TMQ 数据需要安装 `taos-ws-py`。
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
```python
import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### 订阅 topics
Comsumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
```python
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
```python
while True:
res = consumer.poll(timeout=1.0)
if not res:
continue
err = res.error()
if err is not None:
raise err
for block in message:
for row in block:
print(row)
```
#### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
```python
assignments = consumer.assignment()
```
#### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
```python
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
```python
consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
```
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
```
</TabItem>
</Tabs>
......@@ -530,7 +746,142 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
</Tabs>
### 其它示例程序
### 通过参数绑定写入数据
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。
<Tabs>
<TabItem value="native" label="原生连接">
#### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。
```
stmt.bind_param_batch(params)
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
调用 stmt 的 `add_batch` 方法,将参数加入批处理。
```
stmt.add_batch()
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### 更多示例程序
| 示例程序链接 | 示例程序内容 |
| ------------------------------------------------------------------------------------------------------------- | ----------------------- |
......@@ -542,14 +893,6 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
## 其它说明
### 异常处理
所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
```python
{{#include docs/examples/python/handle_exception.py}}
```
``
### 关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册