提交 10122ee8 编写于 作者: P plum-lihui

Merge branch 'main' into test_main/lihui

......@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 3.0
GIT_TAG main
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
......
......@@ -20,6 +20,11 @@ The source code for the Python connector is hosted on [GitHub](https://github.co
- The [supported platforms](/reference/connector/#supported-platforms) for the native connection are the same as the ones supported by the TDengine client.
- REST connections are supported on all platforms that can run Python.
### Supported features
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
## Version selection
We recommend using the latest version of `taospy`, regardless of the version of TDengine.
......@@ -64,10 +69,23 @@ All exceptions from the Python Connector are thrown directly. Applications shoul
{{#include docs/examples/python/handle_exception.py}}
```
## Supported features
## TDengine DataType vs. Python DataType
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Python is as follows:
|TDengine DataType|Python DataType|
|:---------------:|:-------------:|
|TIMESTAMP|datetime|
|INT|int|
|BIGINT|int|
|FLOAT|float|
|DOUBLE|int|
|SMALLINT|int|
|TINYINT|int|
|BOOL|bool|
|BINARY|str|
|NCHAR|str|
|JSON|str|
## Installation
......@@ -544,7 +562,7 @@ Connector support data subscription. For more information about subscroption, pl
The `consumer` in the connector contains the subscription api.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
......@@ -554,7 +572,7 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### Subscribe topics
##### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -562,7 +580,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -580,7 +598,7 @@ while True:
print(block.fetchall())
```
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -588,7 +606,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
```
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -597,7 +615,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -606,13 +624,13 @@ consumer.unsubscribe()
consumer.close()
```
#### Tmq subscription example
##### Tmq subscription example
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### assignment and seek example
##### assignment and seek example
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -624,7 +642,7 @@ consumer.close()
In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
......@@ -634,7 +652,7 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### subscribe topics
##### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -642,7 +660,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -659,7 +677,7 @@ while True:
print(row)
```
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -667,7 +685,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
```
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -675,7 +693,7 @@ The `seek` function is used to reset the assignment of the topic.
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -684,13 +702,13 @@ consumer.unsubscribe()
consumer.close()
```
#### Subscription example
##### Subscription example
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
```
#### Assignment and seek example
##### Assignment and seek example
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -706,19 +724,19 @@ Connector support schemaless insert.
<Tabs defaultValue="list">
<TabItem value="list" label="List Insert">
Simple insert
##### Simple insert
```python
{{#include docs/examples/python/schemaless_insert.py}}
```
Insert with ttl argument
##### Insert with ttl argument
```python
{{#include docs/examples/python/schemaless_insert_ttl.py}}
```
Insert with req_id argument
##### Insert with req_id argument
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -728,19 +746,19 @@ Insert with req_id argument
<TabItem value="raw" label="Raw Insert">
Simple insert
##### Simple insert
```python
{{#include docs/examples/python/schemaless_insert_raw.py}}
```
Insert with ttl argument
##### Insert with ttl argument
```python
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
```
Insert with req_id argument
##### Insert with req_id argument
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -756,7 +774,7 @@ The Python connector provides a parameter binding api for inserting data. Simila
<Tabs>
<TabItem value="native" label="native connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -767,7 +785,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### parameter binding
##### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
......@@ -797,7 +815,7 @@ Call the `bind_param` (for a single row) method or the `bind_param_batch` (for m
stmt.bind_param_batch(params)
```
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -805,13 +823,13 @@ Call `execute` method to execute sql.
stmt.execute()
```
#### Close Stmt
##### Close Stmt
```
stmt.close()
```
#### Example
##### Example
```python
{{#include docs/examples/python/stmt_example.py}}
......@@ -820,7 +838,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -831,7 +849,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### Prepare sql
##### Prepare sql
Call `prepare` method in stmt to prepare sql.
......@@ -839,7 +857,7 @@ Call `prepare` method in stmt to prepare sql.
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### parameter binding
##### parameter binding
Call the `bind_param` method to bind parameters.
......@@ -858,7 +876,7 @@ Call the `add_batch` method to add parameters to the batch.
stmt.add_batch()
```
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -866,13 +884,13 @@ Call `execute` method to execute sql.
stmt.execute()
```
#### Close Stmt
##### Close Stmt
```
stmt.close()
```
#### Example
##### Example
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
......
......@@ -22,7 +22,12 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
## 版本选择
### 支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
## 历史版本
无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。
......@@ -66,12 +71,25 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出
{{#include docs/examples/python/handle_exception.py}}
```
## 支持的功能
TDengine DataType 和 Python DataType
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
|TDengine DataType|Python DataType|
|:---------------:|:-------------:|
|TIMESTAMP|datetime|
|INT|int|
|BIGINT|int|
|FLOAT|float|
|DOUBLE|int|
|SMALLINT|int|
|TINYINT|int|
|BOOL|bool|
|BINARY|str|
|NCHAR|str|
|JSON|str|
## 安装
## 安装步骤
### 安装前准备
......@@ -384,7 +402,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### Connection 类的使用
##### Connection 类的使用
`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。
......@@ -548,7 +566,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -558,7 +576,7 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### 订阅 topics
##### 订阅 topics
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
......@@ -566,7 +584,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -584,7 +602,7 @@ while True:
print(block.fetchall())
```
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -592,7 +610,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
```
#### 重置消费进度
##### 指定订阅 Offset
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
......@@ -601,7 +619,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### 结束消费
##### 关闭订阅
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -610,13 +628,13 @@ consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
##### 完整示例
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -630,7 +648,7 @@ consumer.close()
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -640,7 +658,7 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### 订阅 topics
##### 订阅 topics
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
......@@ -648,7 +666,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -665,7 +683,7 @@ while True:
print(row)
```
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -673,7 +691,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
```
#### 重置消费进度
##### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
......@@ -681,7 +699,7 @@ Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### 结束消费
##### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -690,7 +708,7 @@ consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
##### tmq 订阅示例代码
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
......@@ -698,7 +716,7 @@ consumer.close()
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -714,19 +732,19 @@ consumer.close()
<Tabs defaultValue="list">
<TabItem value="list" label="List 写入">
简单写入
##### 简单写入
```python
{{#include docs/examples/python/schemaless_insert.py}}
```
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_ttl.py}}
```
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -736,19 +754,19 @@ consumer.close()
<TabItem value="raw" label="Raw 写入">
简单写入
##### 简单写入
```python
{{#include docs/examples/python/schemaless_insert_raw.py}}
```
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
```
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -764,7 +782,7 @@ TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写
<Tabs>
<TabItem value="native" label="原生连接">
#### 创建 stmt
##### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -775,7 +793,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### 参数绑定
##### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
......@@ -805,7 +823,7 @@ params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
```
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -813,7 +831,7 @@ stmt.bind_param_batch(params)
stmt.execute()
```
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -821,7 +839,7 @@ stmt.execute()
stmt.close()
```
#### 示例代码
##### 示例代码
```python
{{#include docs/examples/python/stmt_example.py}}
......@@ -830,7 +848,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
##### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -841,7 +859,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### 解析 sql
##### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
......@@ -849,7 +867,7 @@ stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### 参数绑定
##### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
......@@ -868,7 +886,7 @@ stmt.bind_param([
stmt.add_batch()
```
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -876,7 +894,7 @@ stmt.add_batch()
stmt.execute()
```
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -884,7 +902,7 @@ stmt.execute()
stmt.close()
```
#### 示例代码
##### 示例代码
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
......
......@@ -106,7 +106,6 @@ enum {
HEARTBEAT_KEY_DBINFO,
HEARTBEAT_KEY_STBINFO,
HEARTBEAT_KEY_TMQ,
HEARTBEAT_KEY_USER_PASSINFO,
};
typedef enum _mgmt_table {
......@@ -636,6 +635,7 @@ typedef struct {
SEpSet epSet;
int32_t svrTimestamp;
int32_t passVer;
int32_t authVer;
char sVer[TSDB_VERSION_LEN];
char sDetailVer[128];
} SConnectRsp;
......@@ -703,6 +703,7 @@ int32_t tDeserializeSGetUserAuthReq(void* buf, int32_t bufLen, SGetUserAuthReq*
typedef struct {
char user[TSDB_USER_LEN];
int32_t version;
int32_t passVer;
int8_t superAuth;
int8_t sysInfo;
int8_t enable;
......@@ -719,14 +720,6 @@ int32_t tSerializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pR
int32_t tDeserializeSGetUserAuthRsp(void* buf, int32_t bufLen, SGetUserAuthRsp* pRsp);
void tFreeSGetUserAuthRsp(SGetUserAuthRsp* pRsp);
typedef struct SUserPassVersion {
char user[TSDB_USER_LEN];
int32_t version;
} SUserPassVersion;
typedef SGetUserAuthReq SGetUserPassReq;
typedef SUserPassVersion SGetUserPassRsp;
/*
* for client side struct, only column id, type, bytes are necessary
* But for data in vnode side, we need all the following information.
......@@ -1070,14 +1063,6 @@ int32_t tSerializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp
int32_t tDeserializeSUserAuthBatchRsp(void* buf, int32_t bufLen, SUserAuthBatchRsp* pRsp);
void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
typedef struct {
SArray* pArray; // Array of SGetUserPassRsp
} SUserPassBatchRsp;
int32_t tSerializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
int32_t tDeserializeSUserPassBatchRsp(void* buf, int32_t bufLen, SUserPassBatchRsp* pRsp);
void tFreeSUserPassBatchRsp(SUserPassBatchRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
STimeWindow timeRange;
......
......@@ -63,7 +63,7 @@ typedef struct {
// statistics
int32_t reportCnt;
int32_t connKeyCnt;
int32_t passKeyCnt; // with passVer call back
int8_t connHbFlag; // 0 init, 1 send req, 2 get resp
int64_t reportBytes; // not implemented
int64_t startTime;
// ctl
......@@ -83,8 +83,9 @@ typedef struct {
int8_t threadStop;
int8_t quitByKill;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
TdThreadMutex lock; // used when app init and cleanup
SHashObj* appSummary;
SHashObj* appHbHash; // key: clusterId
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[CONN_TYPE__MAX];
FHbRspHandle rspHandle[CONN_TYPE__MAX];
......@@ -146,6 +147,7 @@ typedef struct STscObj {
int64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
int32_t authVer;
SAppInstInfo* pAppInfo;
SHashObj* pRequests;
SPassInfo passInfo;
......
......@@ -22,10 +22,10 @@
typedef struct {
union {
struct {
int64_t clusterId;
int32_t passKeyCnt;
int32_t passVer;
int32_t reqCnt;
SAppHbMgr *pAppHbMgr;
int64_t clusterId;
int32_t reqCnt;
int8_t connHbFlag;
};
};
} SHbParam;
......@@ -34,12 +34,14 @@ static SClientHbMgr clientHbMgr = {0};
static int32_t hbCreateThread();
static void hbStopThread();
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp);
static int32_t hbMqHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req) { return 0; }
static int32_t hbMqHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { return 0; }
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog,
SAppHbMgr *pAppHbMgr) {
int32_t code = 0;
SUserAuthBatchRsp batchRsp = {0};
......@@ -56,54 +58,61 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
catalogUpdateUserAuthInfo(pCatalog, rsp);
}
if (numOfBatchs > 0) hbUpdateUserAuthInfo(pAppHbMgr, &batchRsp);
atomic_val_compare_exchange_8(&pAppHbMgr->connHbFlag, 1, 2);
taosArrayDestroy(batchRsp.pArray);
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessUserPassInfoRsp(void *value, int32_t valueLen, SClientHbKey *connKey, SAppHbMgr *pAppHbMgr) {
int32_t code = 0;
int32_t numOfBatchs = 0;
SUserPassBatchRsp batchRsp = {0};
if (tDeserializeSUserPassBatchRsp(value, valueLen, &batchRsp) != 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
numOfBatchs = taosArrayGetSize(batchRsp.pArray);
SClientHbReq *pReq = NULL;
while ((pReq = taosHashIterate(pAppHbMgr->activeInfo, pReq))) {
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
if (!pTscObj) {
continue;
}
SPassInfo *passInfo = &pTscObj->passInfo;
if (!passInfo->fp) {
releaseTscObj(pReq->connKey.tscRid);
static int32_t hbUpdateUserAuthInfo(SAppHbMgr *pAppHbMgr, SUserAuthBatchRsp *batchRsp) {
uint64_t clusterId = pAppHbMgr->pAppInstInfo->clusterId;
for (int i = 0; i < TARRAY_SIZE(clientHbMgr.appHbMgrs); ++i) {
SAppHbMgr *hbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, i);
if (!hbMgr || hbMgr->pAppInstInfo->clusterId != clusterId) {
continue;
}
for (int32_t i = 0; i < numOfBatchs; ++i) {
SGetUserPassRsp *rsp = taosArrayGet(batchRsp.pArray, i);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
int32_t oldVer = atomic_load_32(&passInfo->ver);
if (oldVer < rsp->version) {
atomic_store_32(&passInfo->ver, rsp->version);
if (passInfo->fp) {
(*passInfo->fp)(passInfo->param, &passInfo->ver, TAOS_NOTIFY_PASSVER);
SClientHbReq *pReq = NULL;
while ((pReq = taosHashIterate(hbMgr->activeInfo, pReq))) {
STscObj *pTscObj = (STscObj *)acquireTscObj(pReq->connKey.tscRid);
if (!pTscObj) {
continue;
}
for (int32_t j = 0; j < TARRAY_SIZE(batchRsp->pArray); ++j) {
SGetUserAuthRsp *rsp = TARRAY_GET_ELEM(batchRsp->pArray, j);
if (0 == strncmp(rsp->user, pTscObj->user, TSDB_USER_LEN)) {
pTscObj->authVer = rsp->version;
#if 0 // make jenkins happy temporarily. After PR pass, enable these lines again.
if (pTscObj->sysInfo != rsp->sysInfo) {
tscDebug("update sysInfo of user %s from %" PRIi8 " to %" PRIi8 ", tscRid:%" PRIi64, rsp->user,
pTscObj->sysInfo, rsp->sysInfo, pTscObj->id);
pTscObj->sysInfo = rsp->sysInfo;
}
#endif
if (pTscObj->passInfo.fp) {
SPassInfo *passInfo = &pTscObj->passInfo;
int32_t oldVer = atomic_load_32(&passInfo->ver);
if (oldVer < rsp->passVer) {
atomic_store_32(&passInfo->ver, rsp->passVer);
if (passInfo->fp) {
(*passInfo->fp)(passInfo->param, &rsp->passVer, TAOS_NOTIFY_PASSVER);
}
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
atomic_load_32(&passInfo->ver), pTscObj->id);
}
}
tscDebug("update passVer of user %s from %d to %d, tscRid:%" PRIi64, rsp->user, oldVer,
atomic_load_32(&passInfo->ver), pTscObj->id);
break;
}
break;
}
releaseTscObj(pReq->connKey.tscRid);
}
releaseTscObj(pReq->connKey.tscRid);
}
taosArrayDestroy(batchRsp.pArray);
return code;
return 0;
}
static int32_t hbGenerateVgInfoFromRsp(SDBVgInfo **pInfo, SUseDbRsp *rsp) {
......@@ -316,7 +325,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
break;
}
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog);
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr);
break;
}
case HEARTBEAT_KEY_DBINFO: {
......@@ -353,15 +362,6 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_USER_PASSINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb user pass info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessUserPassInfoRsp(kv->value, kv->valueLen, &pRsp->connKey, pAppHbMgr);
break;
}
default:
tscError("invalid hb key type:%d", kv->key);
break;
......@@ -543,7 +543,7 @@ int32_t hbGetQueryBasicInfo(SClientHbKey *connKey, SClientHbReq *req) {
return TSDB_CODE_SUCCESS;
}
static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
static int32_t hbGetUserAuthInfo(SClientHbKey *connKey, SHbParam *param, SClientHbReq *req) {
STscObj *pTscObj = (STscObj *)acquireTscObj(connKey->tscRid);
if (!pTscObj) {
tscWarn("tscObj rid %" PRIx64 " not exist", connKey->tscRid);
......@@ -552,46 +552,61 @@ static int32_t hbGetUserBasicInfo(SClientHbKey *connKey, SHbParam *param, SClien
int32_t code = 0;
if (param && (param->passVer != INT32_MIN) && (param->passVer <= pTscObj->passInfo.ver)) {
tscDebug("hb got user basic info, no need since passVer %d <= %d", param->passVer, pTscObj->passInfo.ver);
SKv kv = {.key = HEARTBEAT_KEY_USER_AUTHINFO};
SKv *pKv = NULL;
if ((pKv = taosHashGet(req->info, &kv.key, sizeof(kv.key)))) {
int32_t userNum = pKv->valueLen / sizeof(SUserAuthVersion);
SUserAuthVersion *userAuths = (SUserAuthVersion *)pKv->value;
for (int32_t i = 0; i < userNum; ++i) {
SUserAuthVersion *pUserAuth = userAuths + i;
// both key and user exist, update version
if (strncmp(pUserAuth->user, pTscObj->user, TSDB_USER_LEN) == 0) {
pUserAuth->version = htonl(-1); // force get userAuthInfo
goto _return;
}
}
// key exists, user not exist, append user
SUserAuthVersion *qUserAuth =
(SUserAuthVersion *)taosMemoryRealloc(pKv->value, (userNum + 1) * sizeof(SUserAuthVersion));
if (qUserAuth) {
strncpy((qUserAuth + userNum)->user, pTscObj->user, TSDB_USER_LEN);
(qUserAuth + userNum)->version = htonl(-1); // force get userAuthInfo
pKv->value = qUserAuth;
pKv->valueLen += sizeof(SUserAuthVersion);
} else {
code = TSDB_CODE_OUT_OF_MEMORY;
}
goto _return;
}
SUserPassVersion *user = taosMemoryMalloc(sizeof(SUserPassVersion));
// key/user not exist, add user
SUserAuthVersion *user = taosMemoryMalloc(sizeof(SUserAuthVersion));
if (!user) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
strncpy(user->user, pTscObj->user, TSDB_USER_LEN);
user->version = htonl(pTscObj->passInfo.ver);
user->version = htonl(-1); // force get userAuthInfo
kv.valueLen = sizeof(SUserAuthVersion);
kv.value = user;
SKv kv = {
.key = HEARTBEAT_KEY_USER_PASSINFO,
.valueLen = sizeof(SUserPassVersion),
.value = user,
};
tscDebug("hb got user basic info, valueLen:%d, user:%s, passVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
pTscObj->passInfo.ver, connKey->tscRid);
tscDebug("hb got user auth info, valueLen:%d, user:%s, authVer:%d, tscRid:%" PRIi64, kv.valueLen, user->user,
pTscObj->authVer, connKey->tscRid);
if (!req->info) {
req->info = taosHashInit(64, hbKeyHashFunc, 1, HASH_ENTRY_LOCK);
}
if (taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)) < 0) {
taosMemoryFree(user);
code = terrno ? terrno : TSDB_CODE_APP_ERROR;
goto _return;
}
// assign the passVer
if (param) {
param->passVer = pTscObj->passInfo.ver;
}
_return:
releaseTscObj(connKey->tscRid);
if (code) {
tscError("hb got user basic info failed since %s", terrstr(code));
tscError("hb got user auth info failed since %s", terrstr(code));
}
return code;
......@@ -749,14 +764,21 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
hbGetQueryBasicInfo(connKey, req);
if (hbParam->passKeyCnt > 0) {
hbGetUserBasicInfo(connKey, hbParam, req);
}
if (hbParam->reqCnt == 0) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
if (!taosHashGet(clientHbMgr.appHbHash, &hbParam->clusterId, sizeof(hbParam->clusterId))) {
code = hbGetExpiredUserInfo(connKey, pCatalog, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
}
// invoke after hbGetExpiredUserInfo
if (2 != atomic_load_8(&hbParam->pAppHbMgr->connHbFlag)) {
code = hbGetUserAuthInfo(connKey, hbParam, req);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
atomic_store_8(&hbParam->pAppHbMgr->connHbFlag, 1);
}
code = hbGetExpiredDBInfo(connKey, pCatalog, req);
......@@ -770,7 +792,7 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void *param, SClientHbReq *req
}
}
++hbParam->reqCnt; // success to get catalog info
++hbParam->reqCnt; // success to get catalog info
return TSDB_CODE_SUCCESS;
}
......@@ -815,9 +837,9 @@ SClientHbBatchReq *hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
if (param.clusterId == 0) {
// init
param.clusterId = pOneReq->clusterId;
param.passVer = INT32_MIN;
param.pAppHbMgr = pAppHbMgr;
param.connHbFlag = atomic_load_8(&pAppHbMgr->connHbFlag);
}
param.passKeyCnt = atomic_load_32(&pAppHbMgr->passKeyCnt);
break;
}
default:
......@@ -901,6 +923,10 @@ static void *hbThreadFunc(void *param) {
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
if (sz > 0) {
hbGatherAppInfo();
if (sz > 1 && !clientHbMgr.appHbHash) {
clientHbMgr.appHbHash = taosHashInit(0, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
}
taosHashClear(clientHbMgr.appHbHash);
}
for (int i = 0; i < sz; i++) {
......@@ -953,7 +979,7 @@ static void *hbThreadFunc(void *param) {
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
tFreeClientHbBatchReq(pReq);
// hbClearReqInfo(pAppHbMgr);
taosHashPut(clientHbMgr.appHbHash, &pAppHbMgr->pAppInstInfo->clusterId, sizeof(uint64_t), NULL, 0);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
......@@ -961,6 +987,7 @@ static void *hbThreadFunc(void *param) {
taosMsleep(HEARTBEAT_INTERVAL);
}
taosHashCleanup(clientHbMgr.appHbHash);
return NULL;
}
......@@ -1009,7 +1036,7 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
pAppHbMgr->connKeyCnt = 0;
pAppHbMgr->passKeyCnt = 0;
pAppHbMgr->connHbFlag = 0;
pAppHbMgr->reportCnt = 0;
pAppHbMgr->reportBytes = 0;
pAppHbMgr->key = taosStrdup(key);
......@@ -1127,7 +1154,6 @@ void hbMgrCleanUp() {
appHbMgrCleanup();
taosArrayDestroy(clientHbMgr.appHbMgrs);
taosThreadMutexUnlock(&clientHbMgr.lock);
clientHbMgr.appHbMgrs = NULL;
}
......@@ -1180,12 +1206,6 @@ void hbDeregisterConn(STscObj *pTscObj, SClientHbKey connKey) {
}
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
taosThreadMutexLock(&pTscObj->mutex);
if (pTscObj->passInfo.fp) {
atomic_sub_fetch_32(&pAppHbMgr->passKeyCnt, 1);
}
taosThreadMutexUnlock(&pTscObj->mutex);
}
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
......
......@@ -135,11 +135,6 @@ int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)
switch (type) {
case TAOS_NOTIFY_PASSVER: {
taosThreadMutexLock(&pObj->mutex);
if (fp && !pObj->passInfo.fp) {
atomic_add_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
} else if (!fp && pObj->passInfo.fp) {
atomic_sub_fetch_32(&pObj->pAppInfo->pAppHbMgr->passKeyCnt, 1);
}
pObj->passInfo.fp = fp;
pObj->passInfo.param = param;
taosThreadMutexUnlock(&pObj->mutex);
......
......@@ -131,6 +131,7 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
pTscObj->connType = connectRsp.connType;
pTscObj->passInfo.ver = connectRsp.passVer;
pTscObj->authVer = connectRsp.authVer;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType);
......
......@@ -1523,6 +1523,9 @@ int32_t tSerializeSGetUserAuthRspImpl(SEncoder *pEncoder, SGetUserAuthRsp *pRsp)
useDb = taosHashIterate(pRsp->useDbs, useDb);
}
// since 3.0.7.0
if (tEncodeI32(pEncoder, pRsp->passVer) < 0) return -1;
return 0;
}
......@@ -1644,6 +1647,12 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
taosMemoryFree(key);
}
// since 3.0.7.0
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1;
} else {
pRsp->passVer = 0;
}
}
return 0;
......@@ -3029,59 +3038,6 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray);
}
int32_t tSerializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp *pUserPassRsp = taosArrayGet(pRsp->pArray, i);
if (tEncodeCStr(&encoder, pUserPassRsp->user) < 0) return -1;
if (tEncodeI32(&encoder, pUserPassRsp->version) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSUserPassBatchRsp(void *buf, int32_t bufLen, SUserPassBatchRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(SGetUserPassRsp));
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfBatch; ++i) {
SGetUserPassRsp rsp = {0};
if (tDecodeCStrTo(&decoder, rsp.user) < 0) return -1;
if (tDecodeI32(&decoder, &rsp.version) < 0) return -1;
taosArrayPush(pRsp->pArray, &rsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSUserPassBatchRsp(SUserPassBatchRsp *pRsp) {
if(pRsp) {
taosArrayDestroy(pRsp->pArray);
}
}
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
......@@ -4159,6 +4115,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
if (tEncodeCStr(&encoder, pRsp->sVer) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sDetailVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->passVer) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->authVer) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
......@@ -4188,6 +4145,12 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
} else {
pRsp->passVer = 0;
}
// since 3.0.7.0
if (!tDecodeIsEnd(&decoder)) {
if (tDecodeI32(&decoder, &pRsp->authVer) < 0) return -1;
} else {
pRsp->authVer = 0;
}
tEndDecode(&decoder);
......
......@@ -35,8 +35,6 @@ SHashObj *mndDupTableHash(SHashObj *pOld);
SHashObj *mndDupTopicHash(SHashObj *pOld);
int32_t mndValidateUserAuthInfo(SMnode *pMnode, SUserAuthVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen);
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db);
int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic);
......
......@@ -36,7 +36,9 @@ int32_t mndSetUserAuthRsp(SMnode *pMnode, SUserObj *pUser, SGetUserAuthRsp *pRsp
memcpy(pRsp->user, pUser->user, TSDB_USER_LEN);
pRsp->superAuth = 1;
pRsp->enable = pUser->enable;
pRsp->sysInfo = pUser->sysInfo;
pRsp->version = pUser->authVersion;
pRsp->passVer = pUser->passVersion;
return 0;
}
#endif
\ No newline at end of file
......@@ -288,6 +288,7 @@ _CONNECT:
connectRsp.dnodeNum = mndGetDnodeSize(pMnode);
connectRsp.svrTimestamp = taosGetTimestampSec();
connectRsp.passVer = pUser->passVersion;
connectRsp.authVer = pUser->authVersion;
strcpy(connectRsp.sVer, version);
snprintf(connectRsp.sDetailVer, sizeof(connectRsp.sDetailVer), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
......@@ -552,16 +553,6 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
}
break;
}
case HEARTBEAT_KEY_USER_PASSINFO: {
void *rspMsg = NULL;
int32_t rspLen = 0;
mndValidateUserPassInfo(pMnode, kv->value, kv->valueLen / sizeof(SUserPassVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_USER_PASSINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1);
}
break;
}
default:
mError("invalid kv key:%d", kv->key);
hbRsp.status = TSDB_CODE_APP_ERROR;
......
......@@ -825,7 +825,6 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (mndUserDupObj(pUser, &newUser) != 0) goto _OVER;
newUser.passVersion = pUser->passVersion;
if (alterReq.alterType == TSDB_ALTER_USER_PASSWD) {
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)alterReq.pass, strlen(alterReq.pass), pass);
......@@ -1432,69 +1431,6 @@ _OVER:
return code;
}
int32_t mndValidateUserPassInfo(SMnode *pMnode, SUserPassVersion *pUsers, int32_t numOfUses, void **ppRsp,
int32_t *pRspLen) {
int32_t code = 0;
SUserPassBatchRsp batchRsp = {0};
for (int32_t i = 0; i < numOfUses; ++i) {
SUserObj *pUser = mndAcquireUser(pMnode, pUsers[i].user);
if (pUser == NULL) {
mError("user:%s, failed to validate user pass since %s", pUsers[i].user, terrstr());
continue;
}
pUsers[i].version = ntohl(pUsers[i].version);
if (pUser->passVersion <= pUsers[i].version) {
mTrace("user:%s, not update since mnd passVer %d <= client passVer %d", pUsers[i].user, pUser->passVersion,
pUsers[i].version);
mndReleaseUser(pMnode, pUser);
continue;
}
SGetUserPassRsp rsp = {0};
memcpy(rsp.user, pUser->user, TSDB_USER_LEN);
rsp.version = pUser->passVersion;
if (!batchRsp.pArray && !(batchRsp.pArray = taosArrayInit(numOfUses, sizeof(SGetUserPassRsp)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
mndReleaseUser(pMnode, pUser);
goto _OVER;
}
taosArrayPush(batchRsp.pArray, &rsp);
mndReleaseUser(pMnode, pUser);
}
if (taosArrayGetSize(batchRsp.pArray) <= 0) {
goto _OVER;
}
int32_t rspLen = tSerializeSUserPassBatchRsp(NULL, 0, &batchRsp);
if (rspLen < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
tSerializeSUserPassBatchRsp(pRsp, rspLen, &batchRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
_OVER:
if (code) {
*ppRsp = NULL;
*pRspLen = 0;
}
tFreeSUserPassBatchRsp(&batchRsp);
return code;
}
int32_t mndUserRemoveDb(SMnode *pMnode, STrans *pTrans, char *db) {
int32_t code = 0;
SSdb *pSdb = pMnode->pSdb;
......
......@@ -1157,7 +1157,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
SDecoder* pCoder = &(SDecoder){0};
SDeleteRes* pRes = &(SDeleteRes){0};
*pRefBlock = NULL;
(*pRefBlock) = NULL;
pRes->uidList = taosArrayInit(0, sizeof(tb_uid_t));
if (pRes->uidList == NULL) {
......@@ -1197,7 +1197,7 @@ int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStream
taosArrayDestroy(pRes->uidList);
*pRefBlock = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
if (pRefBlock == NULL) {
if ((*pRefBlock) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
......@@ -3064,6 +3064,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
// only check here, since the iterate data in memory is very fast.
if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
taosArrayDestroy(pIndexList);
return pReader->code;
}
......@@ -5583,58 +5584,3 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; }
/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/
// opt perf, do NOT create so many readers
int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) {
SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC,
.startVersion = -1, .endVersion = -1};
cond.twindows.skey = INT64_MIN;
cond.twindows.ekey = INT64_MAX;
cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo));
cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols);
if (cond.colList == NULL || cond.pSlotList == NULL) {
// todo
}
cond.colList[0].colId = 1;
cond.colList[0].slotId = 0;
cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP;
cond.pSlotList[0] = 0;
STableKeyInfo* pTableKeyInfo = pTableList;
STsdbReader* pReader = NULL;
SSDataBlock* pBlock = createDataBlock();
SColumnInfoData data = {0};
data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE};
blockDataAppendColInfo(pBlock, &data);
int64_t key = INT64_MIN;
for(int32_t i = 0; i < numOfTables; ++i) {
int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, (void**)&pReader, pIdStr, false, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
bool hasData = false;
code = tsdbNextDataBlock(pReader, &hasData);
if (!hasData || code != TSDB_CODE_SUCCESS) {
continue;
}
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0);
int64_t k = *(int64_t*)pCol->pData;
if (key < k) {
key = k;
}
tsdbReaderClose(pReader);
}
return 0;
}
......@@ -1109,7 +1109,6 @@ int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, S
code = doFilterTag(pTagIndexCond, &metaArg, pUidList, &status, &pStorageAPI->metaFilter);
if (code != 0 || status == SFLT_NOT_INDEX) { // temporarily disable it for performance sake
qDebug("failed to get tableIds from index, suid:%" PRIu64, pScanNode->uid);
code = TSDB_CODE_SUCCESS;
} else {
qInfo("succ to get filter result, table num: %d", (int)taosArrayGetSize(pUidList));
}
......
......@@ -150,9 +150,12 @@ static int32_t initTagColskeyBuf(int32_t* keyLen, char** keyBuf, const SArray* p
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
(*keyLen) += nullFlagSize;
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
if (*keyLen >= 0) {
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
if ((*keyBuf) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return TSDB_CODE_SUCCESS;
......
......@@ -27,6 +27,7 @@ SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
if (pArray == NULL) {
taosFreeQitem(pData);
return NULL;
}
......
......@@ -212,8 +212,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
}
if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes);
if (finished) {
taosArrayDestroy(pRes);
qDebug("s-task:%s finish recover exec task ", pTask->id.idStr);
break;
} else {
......@@ -317,6 +318,61 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
const char* id) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node
if (pTask->taskLevel == TASK_LEVEL__SINK) {
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;
}
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = streamMergeQueueItem(*pInput, qItem);
if (newRet == NULL) {
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
streamQueueProcessFail(pTask->inputQueue);
return TSDB_CODE_SUCCESS;
}
*pInput = newRet;
}
*numOfBlocks += 1;
streamQueueProcessSuccess(pTask->inputQueue);
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
return TSDB_CODE_SUCCESS;
}
}
}
/**
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
* appropriate batch of blocks should be handled in 5 to 10 sec.
......@@ -325,70 +381,15 @@ int32_t streamExecForAll(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
while (1) {
int32_t batchSize = 1;
int16_t times = 0;
int32_t batchSize = 0;
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
if (batchSize > 1) {
break;
} else {
return 0;
}
}
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
times++;
taosMsleep(10);
qDebug("===stream===try again batchSize:%d", batchSize);
continue;
}
qDebug("===stream===break batchSize:%d", batchSize);
break;
}
if (pInput == NULL) {
pInput = qItem;
streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->taskLevel == TASK_LEVEL__SINK) {
break;
}
} else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue);
break;
} else {
batchSize++;
pInput = newRet;
streamQueueProcessSuccess(pTask->inputQueue);
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
MAX_STREAM_EXEC_BATCH_NUM);
break;
}
}
}
}
if (streamTaskShouldStop(&pTask->status)) {
if (pInput) {
streamFreeQitem(pInput);
}
return 0;
}
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
if (pInput == NULL) {
ASSERT(batchSize == 0);
break;
}
......@@ -403,8 +404,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
int8_t status = atomic_load_8(&pTask->status.taskStatus);
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
atomic_load_8(&pTask->status.taskStatus));
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
taosMsleep(100);
} else {
break;
......@@ -457,6 +457,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - st) / 1000.0;
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
id, el, resSize / 1048576.0, totalBlocks);
streamFreeQitem(pInput);
}
......
......@@ -32,9 +32,21 @@
#define nRoot 10
#define nUser 10
#define USER_LEN 24
#define BUF_LEN 256
typedef uint16_t VarDataLenT;
#define TSDB_NCHAR_SIZE sizeof(int32_t)
#define VARSTR_HEADER_SIZE sizeof(VarDataLenT)
#define GET_FLOAT_VAL(x) (*(float *)(x))
#define GET_DOUBLE_VAL(x) (*(double *)(x))
#define varDataLen(v) ((VarDataLenT *)(v))[0]
void createUsers(TAOS *taos, const char *host, char *qstr);
void passVerTestMulti(const char *host, char *qstr);
void sysInfoTest(const char *host, char *qstr);
int nPassVerNotified = 0;
TAOS *taosu[nRoot] = {0};
......@@ -83,6 +95,95 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
}
int printRow(char *str, TAOS_ROW row, TAOS_FIELD *fields, int numFields) {
int len = 0;
char split = ' ';
for (int i = 0; i < numFields; ++i) {
if (i > 0) {
str[len++] = split;
}
if (row[i] == NULL) {
len += sprintf(str + len, "%s", "NULL");
continue;
}
switch (fields[i].type) {
case TSDB_DATA_TYPE_TINYINT:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
break;
case TSDB_DATA_TYPE_UTINYINT:
len += sprintf(str + len, "%u", *((uint8_t *)row[i]));
break;
case TSDB_DATA_TYPE_SMALLINT:
len += sprintf(str + len, "%d", *((int16_t *)row[i]));
break;
case TSDB_DATA_TYPE_USMALLINT:
len += sprintf(str + len, "%u", *((uint16_t *)row[i]));
break;
case TSDB_DATA_TYPE_INT:
len += sprintf(str + len, "%d", *((int32_t *)row[i]));
break;
case TSDB_DATA_TYPE_UINT:
len += sprintf(str + len, "%u", *((uint32_t *)row[i]));
break;
case TSDB_DATA_TYPE_BIGINT:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_UBIGINT:
len += sprintf(str + len, "%" PRIu64, *((uint64_t *)row[i]));
break;
case TSDB_DATA_TYPE_FLOAT: {
float fv = 0;
fv = GET_FLOAT_VAL(row[i]);
len += sprintf(str + len, "%f", fv);
} break;
case TSDB_DATA_TYPE_DOUBLE: {
double dv = 0;
dv = GET_DOUBLE_VAL(row[i]);
len += sprintf(str + len, "%lf", dv);
} break;
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_GEOMETRY: {
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
memcpy(str + len, row[i], charLen);
len += charLen;
} break;
case TSDB_DATA_TYPE_TIMESTAMP:
len += sprintf(str + len, "%" PRId64, *((int64_t *)row[i]));
break;
case TSDB_DATA_TYPE_BOOL:
len += sprintf(str + len, "%d", *((int8_t *)row[i]));
default:
break;
}
}
return len;
}
static int printResult(TAOS_RES *res, char *output) {
int numFields = taos_num_fields(res);
TAOS_FIELD *fields = taos_fetch_fields(res);
char header[BUF_LEN] = {0};
int len = 0;
for (int i = 0; i < numFields; ++i) {
len += sprintf(header + len, "%s ", fields[i].name);
}
puts(header);
if (output) {
strncpy(output, header, BUF_LEN);
}
TAOS_ROW row = NULL;
while ((row = taos_fetch_row(res))) {
char temp[BUF_LEN] = {0};
printRow(temp, row, fields, numFields);
puts(temp);
}
}
int main(int argc, char *argv[]) {
char qstr[1024];
......@@ -99,6 +200,7 @@ int main(int argc, char *argv[]) {
}
createUsers(taos, argv[1], qstr);
passVerTestMulti(argv[1], qstr);
sysInfoTest(argv[1], qstr);
taos_close(taos);
taos_cleanup();
......@@ -167,6 +269,8 @@ void passVerTestMulti(const char *host, char *qstr) {
int nConn = nRoot + nUser;
for (int i = 0; i < 15; ++i) {
printf("%s:%d [%d] second(s) elasped, passVer notification received:%d, total:%d\n", __func__, __LINE__, i,
nPassVerNotified, nConn);
if (nPassVerNotified >= nConn) break;
sleep(1);
}
......@@ -175,19 +279,101 @@ void passVerTestMulti(const char *host, char *qstr) {
for (int i = 0; i < nRoot; ++i) {
taos_close(taos[i]);
printf("%s:%d close taos[%d]\n", __func__, __LINE__, i);
sleep(1);
// sleep(1);
}
for (int i = 0; i < nUser; ++i) {
taos_close(taosu[i]);
printf("%s:%d close taosu[%d]\n", __func__, __LINE__, i);
sleep(1);
// sleep(1);
}
fprintf(stderr, "######## %s #########\n", __func__);
if (nPassVerNotified >= nConn) {
fprintf(stderr, "succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified, nConn);
fprintf(stderr, ">>> succeed to get passVer notification since nNotify %d >= nConn %d\n", nPassVerNotified,
nConn);
} else {
fprintf(stderr, "failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn);
fprintf(stderr, ">>> failed to get passVer notification since nNotify %d < nConn %d\n", nPassVerNotified, nConn);
}
fprintf(stderr, "######## %s #########\n", __func__);
// sleep(300);
}
void sysInfoTest(const char *host, char *qstr) {
// root
TAOS *taos[nRoot] = {0};
char userName[USER_LEN] = "root";
for (int i = 0; i < nRoot; ++i) {
taos[i] = taos_connect(host, "root", "taos", NULL, 0);
if (taos[i] == NULL) {
fprintf(stderr, "failed to connect to server, reason:%s\n", "null taos" /*taos_errstr(taos)*/);
exit(1);
}
}
queryDB(taos[0], "create database if not exists demo11 vgroups 1 minrows 10");
queryDB(taos[0], "create database if not exists demo12 vgroups 1 minrows 10");
queryDB(taos[0], "create database if not exists demo13 vgroups 1 minrows 10");
queryDB(taos[0], "create table demo11.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo12.stb (ts timestamp, c1 int) tags(t1 int)");
queryDB(taos[0], "create table demo13.stb (ts timestamp, c1 int) tags(t1 int)");
sprintf(qstr, "show grants");
char output[BUF_LEN];
TAOS_RES *res = NULL;
int32_t nRep = 0;
_REP:
fprintf(stderr, "######## %s loop:%d #########\n", __func__, nRep);
res = taos_query(taos[0], qstr);
if (taos_errno(res) != 0) {
fprintf(stderr, "%s:%d failed to execute: %s since %s\n", __func__, __LINE__, qstr, taos_errstr(res));
taos_free_result(res);
exit(EXIT_FAILURE);
}
printResult(res, output);
taos_free_result(res);
if (!strstr(output, "timeseries")) {
fprintf(stderr, "%s:%d expected output: 'timeseries' not occur\n", __func__, __LINE__);
exit(EXIT_FAILURE);
}
queryDB(taos[0], "alter user root sysinfo 0");
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1);
}
res = taos_query(taos[0], qstr);
if (taos_errno(res) != 0) {
if (!strstr(taos_errstr(res), "Permission denied")) {
fprintf(stderr, "%s:%d expected error: 'Permission denied' not occur\n", __func__, __LINE__);
taos_free_result(res);
exit(EXIT_FAILURE);
}
}
taos_free_result(res);
queryDB(taos[0], "alter user root sysinfo 1");
fprintf(stderr, "%s:%d sleep 2 seconds to wait HB take effect\n", __func__, __LINE__);
for (int i = 1; i <= 2; ++i) {
sleep(1);
}
if(++nRep < 5) {
goto _REP;
}
// close the taos_conn
for (int i = 0; i < nRoot; ++i) {
taos_close(taos[i]);
fprintf(stderr, "%s:%d close taos[%d]\n", __func__, __LINE__, i);
}
fprintf(stderr, "######## %s #########\n", __func__);
fprintf(stderr, ">>> succeed to run sysInfoTest\n");
fprintf(stderr, "######## %s #########\n", __func__);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册