提交 1304c08c 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -314,7 +314,7 @@ def pre_test_build_win() {
cd %WIN_CONNECTOR_ROOT%
python.exe -m pip install --upgrade pip
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.6
python -m pip install taospy==2.7.10
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
......
......@@ -2,11 +2,11 @@
# geos
ExternalProject_Add(geos
GIT_REPOSITORY https://github.com/libgeos/geos.git
GIT_TAG 3.11.2
GIT_TAG 3.12.0
SOURCE_DIR "${TD_CONTRIB_DIR}/geos"
BINARY_DIR ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
)
\ No newline at end of file
)
......@@ -5,8 +5,8 @@ if (${BUILD_CONTRIB})
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
......@@ -18,8 +18,8 @@ else()
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
......
......@@ -20,6 +20,11 @@ The source code for the Python connector is hosted on [GitHub](https://github.co
- The [supported platforms](/reference/connector/#supported-platforms) for the native connection are the same as the ones supported by the TDengine client.
- REST connections are supported on all platforms that can run Python.
### Supported features
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
## Version selection
We recommend using the latest version of `taospy`, regardless of the version of TDengine.
......@@ -64,10 +69,23 @@ All exceptions from the Python Connector are thrown directly. Applications shoul
{{#include docs/examples/python/handle_exception.py}}
```
## Supported features
## TDengine DataType vs. Python DataType
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
- REST connections support features such as connection management and SQL execution. (SQL execution allows you to: manage databases, tables, and supertables, write data, query data, create continuous queries, etc.).
TDengine currently supports timestamp, number, character, Boolean type, and the corresponding type conversion with Python is as follows:
|TDengine DataType|Python DataType|
|:---------------:|:-------------:|
|TIMESTAMP|datetime|
|INT|int|
|BIGINT|int|
|FLOAT|float|
|DOUBLE|int|
|SMALLINT|int|
|TINYINT|int|
|BOOL|bool|
|BINARY|str|
|NCHAR|str|
|JSON|str|
## Installation
......@@ -544,7 +562,7 @@ Connector support data subscription. For more information about subscroption, pl
The `consumer` in the connector contains the subscription api.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is `consumer = Consumer(configs)`. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/).
......@@ -554,7 +572,7 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### Subscribe topics
##### Subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -562,7 +580,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -580,7 +598,7 @@ while True:
print(block.fetchall())
```
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -588,7 +606,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
```
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -597,7 +615,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -606,13 +624,13 @@ consumer.unsubscribe()
consumer.close()
```
#### Tmq subscription example
##### Tmq subscription example
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### assignment and seek example
##### assignment and seek example
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -624,7 +642,7 @@ consumer.close()
In addition to native connections, the connector also supports subscriptions via websockets.
#### Create Consumer
##### Create Consumer
The syntax for creating a consumer is "consumer = consumer = Consumer(conf=configs)". You need to specify that the `td.connect.websocket.scheme` parameter is set to "ws" in the configuration. For more subscription api parameters, please refer to [Data Subscription](../../../develop/tmq/#create-a-consumer).
......@@ -634,7 +652,7 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### subscribe topics
##### subscribe topics
The `subscribe` function is used to subscribe to a list of topics.
......@@ -642,7 +660,7 @@ The `subscribe` function is used to subscribe to a list of topics.
consumer.subscribe(['topic1', 'topic2'])
```
#### Consume
##### Consume
The `poll` function is used to consume data in tmq. The parameter of the `poll` function is a value of type float representing the timeout in seconds. It returns a `Message` before timing out, or `None` on timing out. You have to handle error messages in response data.
......@@ -659,7 +677,7 @@ while True:
print(row)
```
#### assignment
##### assignment
The `assignment` function is used to get the assignment of the topic.
......@@ -667,7 +685,7 @@ The `assignment` function is used to get the assignment of the topic.
assignments = consumer.assignment()
```
#### Seek
##### Seek
The `seek` function is used to reset the assignment of the topic.
......@@ -675,7 +693,7 @@ The `seek` function is used to reset the assignment of the topic.
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### After consuming data
##### After consuming data
You should unsubscribe to the topics and close the consumer after consuming.
......@@ -684,13 +702,13 @@ consumer.unsubscribe()
consumer.close()
```
#### Subscription example
##### Subscription example
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
```
#### Assignment and seek example
##### Assignment and seek example
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -706,19 +724,19 @@ Connector support schemaless insert.
<Tabs defaultValue="list">
<TabItem value="list" label="List Insert">
Simple insert
##### Simple insert
```python
{{#include docs/examples/python/schemaless_insert.py}}
```
Insert with ttl argument
##### Insert with ttl argument
```python
{{#include docs/examples/python/schemaless_insert_ttl.py}}
```
Insert with req_id argument
##### Insert with req_id argument
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -728,19 +746,19 @@ Insert with req_id argument
<TabItem value="raw" label="Raw Insert">
Simple insert
##### Simple insert
```python
{{#include docs/examples/python/schemaless_insert_raw.py}}
```
Insert with ttl argument
##### Insert with ttl argument
```python
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
```
Insert with req_id argument
##### Insert with req_id argument
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -756,7 +774,7 @@ The Python connector provides a parameter binding api for inserting data. Simila
<Tabs>
<TabItem value="native" label="native connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -767,7 +785,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### parameter binding
##### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
......@@ -797,7 +815,7 @@ Call the `bind_param` (for a single row) method or the `bind_param_batch` (for m
stmt.bind_param_batch(params)
```
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -805,13 +823,13 @@ Call `execute` method to execute sql.
stmt.execute()
```
#### Close Stmt
##### Close Stmt
```
stmt.close()
```
#### Example
##### Example
```python
{{#include docs/examples/python/stmt_example.py}}
......@@ -820,7 +838,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
##### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
......@@ -831,7 +849,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### Prepare sql
##### Prepare sql
Call `prepare` method in stmt to prepare sql.
......@@ -839,7 +857,7 @@ Call `prepare` method in stmt to prepare sql.
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### parameter binding
##### parameter binding
Call the `bind_param` method to bind parameters.
......@@ -858,7 +876,7 @@ Call the `add_batch` method to add parameters to the batch.
stmt.add_batch()
```
#### execute sql
##### execute sql
Call `execute` method to execute sql.
......@@ -866,13 +884,13 @@ Call `execute` method to execute sql.
stmt.execute()
```
#### Close Stmt
##### Close Stmt
```
stmt.close()
```
#### Example
##### Example
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
......
......@@ -21,7 +21,12 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。
- REST 连接支持所有能运行 Python 的平台。
## 版本选择
### 支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
## 历史版本
无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。
......@@ -65,12 +70,25 @@ Python Connector 的所有数据库操作如果出现异常,都会直接抛出
{{#include docs/examples/python/handle_exception.py}}
```
## 支持的功能
TDengine DataType 和 Python DataType
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
- REST 连接支持的功能包括:连接管理、执行 SQL。 (通过执行 SQL 可以: 管理数据库、管理表和超级表、写入数据、查询数据、创建连续查询等)。
TDengine 目前支持时间戳、数字、字符、布尔类型,与 Python 对应类型转换如下:
|TDengine DataType|Python DataType|
|:---------------:|:-------------:|
|TIMESTAMP|datetime|
|INT|int|
|BIGINT|int|
|FLOAT|float|
|DOUBLE|int|
|SMALLINT|int|
|TINYINT|int|
|BOOL|bool|
|BINARY|str|
|NCHAR|str|
|JSON|str|
## 安装
## 安装步骤
### 安装前准备
......@@ -383,7 +401,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### Connection 类的使用
##### Connection 类的使用
`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。
......@@ -547,7 +565,7 @@ RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方
`Consumer` 提供了 Python 连接器订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(configs)`,参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -557,7 +575,7 @@ from taos.tmq import Consumer
consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
```
#### 订阅 topics
##### 订阅 topics
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
......@@ -565,7 +583,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -583,7 +601,7 @@ while True:
print(block.fetchall())
```
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -591,7 +609,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
```
#### 重置消费进度
##### 指定订阅 Offset
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置,方法参数类型为 TopicPartition。
......@@ -600,7 +618,7 @@ tp = TopicPartition(topic='topic1', partition=0, offset=0)
consumer.seek(tp)
```
#### 结束消费
##### 关闭订阅
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -609,13 +627,13 @@ consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
##### 完整示例
```python
{{#include docs/examples/python/tmq_example.py}}
```
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_assignment_example.py:taos_get_assignment_and_seek_demo}}
......@@ -629,7 +647,7 @@ consumer.close()
taosws `Consumer` API 提供了基于 Websocket 订阅 TMQ 数据的 API。
#### 创建 Consumer
##### 创建 Consumer
创建 Consumer 语法为 `consumer = Consumer(conf=configs)`,使用时需要指定 `td.connect.websocket.scheme` 参数值为 "ws",参数定义请参考 [数据订阅文档](../../develop/tmq/#%E5%88%9B%E5%BB%BA%E6%B6%88%E8%B4%B9%E8%80%85-consumer)。
......@@ -639,7 +657,7 @@ import taosws
consumer = taosws.(conf={"group.id": "local", "td.connect.websocket.scheme": "ws"})
```
#### 订阅 topics
##### 订阅 topics
Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时订阅多个 topic。
......@@ -647,7 +665,7 @@ Consumer API 的 `subscribe` 方法用于订阅 topics,consumer 支持同时
consumer.subscribe(['topic1', 'topic2'])
```
#### 消费数据
##### 消费数据
Consumer API 的 `poll` 方法用于消费数据,`poll` 方法接收一个 float 类型的超时时间,超时时间单位为秒(s),`poll` 方法在超时之前返回一条 Message 类型的数据或超时返回 `None`。消费者必须通过 Message 的 `error()` 方法校验返回数据的 error 信息。
......@@ -664,7 +682,7 @@ while True:
print(row)
```
#### 获取消费进度
##### 获取消费进度
Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic 的消费进度,返回结果类型为 TopicPartition 列表。
......@@ -672,7 +690,7 @@ Consumer API 的 `assignment` 方法用于获取 Consumer 订阅的所有 topic
assignments = consumer.assignment()
```
#### 重置消费进度
##### 重置消费进度
Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位置。
......@@ -680,7 +698,7 @@ Consumer API 的 `seek` 方法用于重置 Consumer 的消费进度到指定位
consumer.seek(topic='topic1', partition=0, offset=0)
```
#### 结束消费
##### 结束消费
消费结束后,应当取消订阅,并关闭 Consumer。
......@@ -689,7 +707,7 @@ consumer.unsubscribe()
consumer.close()
```
#### tmq 订阅示例代码
##### tmq 订阅示例代码
```python
{{#include docs/examples/python/tmq_websocket_example.py}}
......@@ -697,7 +715,7 @@ consumer.close()
连接器提供了 `assignment` 接口,用于获取 topic assignment 的功能,可以查询订阅的 topic 的消费进度,并提供 `seek` 接口,用于重置 topic 的消费进度。
#### 获取和重置消费进度示例代码
##### 获取和重置消费进度示例代码
```python
{{#include docs/examples/python/tmq_websocket_assgnment_example.py:taosws_get_assignment_and_seek_demo}}
......@@ -713,19 +731,19 @@ consumer.close()
<Tabs defaultValue="list">
<TabItem value="list" label="List 写入">
简单写入
##### 简单写入
```python
{{#include docs/examples/python/schemaless_insert.py}}
```
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_ttl.py}}
```
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_req_id.py}}
......@@ -735,19 +753,19 @@ consumer.close()
<TabItem value="raw" label="Raw 写入">
简单写入
##### 简单写入
```python
{{#include docs/examples/python/schemaless_insert_raw.py}}
```
带有 ttl 参数的写入
##### 带有 ttl 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_raw_ttl.py}}
```
带有 req_id 参数的写入
##### 带有 req_id 参数的写入
```python
{{#include docs/examples/python/schemaless_insert_raw_req_id.py}}
......@@ -763,7 +781,7 @@ TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写
<Tabs>
<TabItem value="native" label="原生连接">
#### 创建 stmt
##### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -774,7 +792,7 @@ conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### 参数绑定
##### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
......@@ -804,7 +822,7 @@ params[15].timestamp([None, None, 1626861392591])
stmt.bind_param_batch(params)
```
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -812,7 +830,7 @@ stmt.bind_param_batch(params)
stmt.execute()
```
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -820,7 +838,7 @@ stmt.execute()
stmt.close()
```
#### 示例代码
##### 示例代码
```python
{{#include docs/examples/python/stmt_example.py}}
......@@ -829,7 +847,7 @@ stmt.close()
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
##### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
......@@ -840,7 +858,7 @@ conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### 解析 sql
##### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
......@@ -848,7 +866,7 @@ stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### 参数绑定
##### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
......@@ -867,7 +885,7 @@ stmt.bind_param([
stmt.add_batch()
```
#### 执行 sql
##### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
......@@ -875,7 +893,7 @@ stmt.add_batch()
stmt.execute()
```
#### 关闭 stmt
##### 关闭 stmt
最后需要关闭 stmt。
......@@ -883,7 +901,7 @@ stmt.execute()
stmt.close()
```
#### 示例代码
##### 示例代码
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
......
......@@ -54,6 +54,11 @@ typedef struct SSessionKey {
uint64_t groupId;
} SSessionKey;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) {
SWinKey* pWin1 = (SWinKey*)pKey1;
SWinKey* pWin2 = (SWinKey*)pKey2;
......@@ -131,10 +136,10 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2,
enum {
TMQ_MSG_TYPE__DUMMY = 0,
TMQ_MSG_TYPE__POLL_RSP,
TMQ_MSG_TYPE__POLL_DATA_RSP,
TMQ_MSG_TYPE__POLL_META_RSP,
TMQ_MSG_TYPE__EP_RSP,
TMQ_MSG_TYPE__TAOSX_RSP,
TMQ_MSG_TYPE__POLL_DATA_META_RSP,
TMQ_MSG_TYPE__WALINFO_RSP,
TMQ_MSG_TYPE__END_RSP,
};
......
......@@ -177,7 +177,6 @@ static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint
int32_t getJsonValueLen(const char* data);
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
int32_t colDataReassignVal(SColumnInfoData* pColumnInfoData, uint32_t dstRowIdx, uint32_t srcRowIdx, const char* pData);
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
......@@ -187,6 +186,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);
......@@ -208,7 +208,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
......@@ -237,11 +236,10 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColIn
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag);
void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
......@@ -251,9 +249,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pData
char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
}
void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
#ifdef __cplusplus
}
......
......@@ -2909,6 +2909,12 @@ enum {
TMQ_OFFSET__SNAPSHOT_META = 3,
};
enum {
WITH_DATA = 0,
WITH_META = 1,
ONLY_META = 2,
};
typedef struct {
int8_t type;
union {
......
......@@ -252,7 +252,9 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......@@ -297,8 +299,7 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
......
此差异已折叠。
......@@ -55,6 +55,9 @@ typedef struct {
void* pStateBackend;
struct SStorageAPI api;
int8_t fillHistory;
STimeWindow winRange;
} SReadHandle;
// in queue mode, data streams are seperated by msg
......@@ -193,14 +196,6 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
STimeWindow getAlignQueryTimeWindow(const SInterval* pInterval, int64_t key);
/**
* return the scan info, in the form of tuple of two items, including table uid and current timestamp
* @param tinfo
* @param uid
* @param ts
* @return
*/
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
......@@ -220,15 +215,22 @@ void* qExtractReaderFromStreamScanner(void* scanner);
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner);
int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo);
int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo);
int32_t qStreamSourceScanParamForHistoryScanStep1(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRange *pVerRange, STimeWindow* pWindow);
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
int32_t qRestoreStreamOperatorOption(qTaskInfo_t tinfo);
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
void qStreamCloseTsdbReader(void* task);
bool qStreamRecoverScanStep1Finished(qTaskInfo_t tinfo);
bool qStreamRecoverScanStep2Finished(qTaskInfo_t tinfo);
int32_t qStreamRecoverSetAllStepFinished(qTaskInfo_t tinfo);
void resetTaskInfo(qTaskInfo_t tinfo);
void qResetStreamInfoTimeWindow(qTaskInfo_t tinfo);
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo);
int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo);
#ifdef __cplusplus
}
#endif
......
......@@ -123,8 +123,8 @@ typedef struct SSnapContext {
SHashObj* suidInfo;
SArray* idList;
int32_t index;
bool withMeta;
bool queryMeta; // true-get meta, false-get data
int8_t withMeta;
int8_t queryMeta; // true-get meta, false-get data
} SSnapContext;
typedef struct {
......@@ -234,29 +234,6 @@ typedef struct SStoreSnapshotFn {
int32_t (*getTableInfoFromSnapshot)(SSnapContext* ctx, void** pBuf, int32_t* contLen, int16_t* type, int64_t* uid);
} SStoreSnapshotFn;
/**
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderReleaseLock(SMetaReader *pReader);
void metaReaderClear(SMetaReader *pReader);
int32_t metaReaderGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReaderGetTableEntryByUidCache(SMetaReader *pReader, tb_uid_t uid);
int32_t metaGetTableTags(SMeta *pMeta, uint64_t suid, SArray *uidList);
const void *metaGetTableTagVal(void *tag, int16_t type, STagVal *tagVal);
int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *meta, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
bool metaIsTableExist(SMeta *pMeta, tb_uid_t uid);
int32_t metaGetCachedTableUidList(SMeta *pMeta, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired);
int32_t metaUidFilterCachePut(SMeta *pMeta, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen, double selectivityRatio);
tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
int32_t metaGetCachedTbGroup(SMeta* pMeta, tb_uid_t suid, const uint8_t* pKey, int32_t keyLen, SArray** pList);
int32_t metaPutTbGroupToCache(SMeta* pMeta, uint64_t suid, const void* pKey, int32_t keyLen, void* pPayload, int32_t
payloadLen);
*/
typedef struct SStoreMeta {
SMTbCursor* (*openTableMetaCursor)(void* pVnode); // metaOpenTbCursor
void (*closeTableMetaCursor)(SMTbCursor* pTbCur); // metaCloseTbCursor
......@@ -403,7 +380,7 @@ typedef struct SStateStore {
SStreamStateCur* (*streamStateSessionSeekKeyCurrentNext)(SStreamState* pState, const SSessionKey* key);
struct SStreamFileState* (*streamFileStateInit)(int64_t memSize, uint32_t keySize, uint32_t rowSize,
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark);
uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark, const char*id);
void (*streamFileStateDestroy)(struct SStreamFileState* pFileState);
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
......@@ -415,6 +392,7 @@ typedef struct SStateStore {
int32_t (*streamStateCommit)(SStreamState* pState);
void (*streamStateDestroy)(SStreamState* pState, bool remove);
int32_t (*streamStateDeleteCheckPoint)(SStreamState* pState, TSKEY mark);
void (*streamStateReloadInfo)(SStreamState* pState, TSKEY ts);
} SStateStore;
typedef struct SStorageAPI {
......
......@@ -129,30 +129,38 @@ typedef struct SSerializeDataHandle {
} SSerializeDataHandle;
// incremental state storage
typedef struct SBackendCfWrapper {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
void *compactFactory;
TdThreadRwlock rwLock;
bool remove;
int64_t backendId;
char idstr[64];
} SBackendCfWrapper;
typedef struct STdbState {
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
SBackendCfWrapper *pBackendCfWrapper;
int64_t backendCfWrapperId;
char idstr[64];
struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void *compactFactory;
TdThreadRwlock rwLock;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;
typedef struct {
......
......@@ -362,7 +362,7 @@ typedef struct SCreateTopicStmt {
char subDbName[TSDB_DB_NAME_LEN];
char subSTbName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
bool withMeta;
int8_t withMeta;
SNode* pQuery;
SNode* pWhere;
} SCreateTopicStmt;
......
......@@ -138,6 +138,8 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);
/***compare func **/
typedef struct SStateChekpoint {
......
......@@ -44,10 +44,8 @@ enum {
TASK_STATUS__DROPPING,
TASK_STATUS__FAIL,
TASK_STATUS__STOP,
TASK_STATUS__WAIT_DOWNSTREAM,
TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2,
TASK_STATUS__SCAN_HISTORY, // stream task scan history data by using tsdbread in the stream scanner
TASK_STATUS__HALT, // stream task will handle all data in the input queue, and then paused
TASK_STATUS__PAUSE,
};
......@@ -133,7 +131,6 @@ typedef struct {
// ref data block, for delete
typedef struct {
int8_t type;
int64_t ver;
SSDataBlock* pBlock;
} SStreamRefDataBlock;
......@@ -203,13 +200,11 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
void* streamQueueNextItem(SStreamQueue* queue);
void* streamQueueNextItem(SStreamQueue* pQueue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
typedef struct {
char* qmsg;
void* pExecutor; // not applicable to encoder and decoder
......@@ -251,7 +246,7 @@ typedef struct {
int8_t reserved;
} STaskSinkFetch;
typedef struct {
typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
......@@ -271,31 +266,55 @@ typedef struct SCheckpointInfo {
} SCheckpointInfo;
typedef struct SStreamStatus {
int8_t taskStatus;
int8_t schedStatus;
int8_t keepTaskStatus;
int8_t taskStatus;
int8_t downstreamReady; // downstream tasks are all ready now, if this flag is set
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
int8_t timerActive; // timer is active
} SStreamStatus;
typedef struct SHistDataRange {
SVersionRange range;
STimeWindow window;
} SHistDataRange;
typedef struct SSTaskBasicInfo {
int32_t nodeId; // vgroup id or snode id
SEpSet epSet;
int32_t selfChildId;
int32_t totalLevel;
int8_t taskLevel;
int8_t fillHistory; // is fill history task or not
} SSTaskBasicInfo;
typedef struct SDispatchMsgInfo {
void* pData; // current dispatch data
int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count
int64_t blockingTs; // output blocking timestamp
} SDispatchMsgInfo;
typedef struct {
int8_t outputType;
int8_t outputStatus;
SStreamQueue* outputQueue;
} SSTaskOutputInfo;
struct SStreamTask {
SStreamId id;
int32_t totalLevel;
int8_t taskLevel;
int8_t outputType;
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
SStreamId id;
SSTaskBasicInfo info;
int8_t outputType;
SDispatchMsgInfo msgInfo;
SStreamStatus status;
SCheckpointInfo chkInfo;
STaskExec exec;
SHistDataRange dataRange;
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamEpInfoList; // SArray<SStreamChildEpInfo*>, // children info
int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
// output
union {
......@@ -314,13 +333,14 @@ struct SStreamTask {
// trigger
int8_t triggerStatus;
int64_t triggerParam;
void* timer;
void* schedTimer;
void* launchTaskTimer;
SMsgCb* pMsgCb; // msg handle
SStreamState* pState; // state backend
// the followings attributes don't be serialized
int32_t recoverTryingDownstream;
int32_t recoverWaitingUpstream;
int32_t notReadyTasks;
int32_t numOfWaitingUpstream;
int64_t checkReqId;
SArray* checkReqIds; // shuffle
int32_t refCnt;
......@@ -332,21 +352,22 @@ struct SStreamTask {
// meta
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pCheckpointDb;
SHashObj* pTasks;
SArray* pTaskList; // SArray<task_id*>
void* ahandle;
TXN* txn;
FTaskExpand* expandFunc;
int32_t vgId;
SRWLatch lock;
int32_t walScanCounter;
void* streamBackend;
int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
} SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
......@@ -431,16 +452,17 @@ typedef struct {
SMsgHead msgHead;
int64_t streamId;
int32_t taskId;
} SStreamRecoverStep1Req, SStreamRecoverStep2Req;
int8_t igUntreated;
} SStreamScanHistoryReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t childId;
} SStreamRecoverFinishReq;
} SStreamRecoverFinishReq, SStreamTransferReq;
int32_t tEncodeSStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeSStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
int32_t tEncodeStreamRecoverFinishReq(SEncoder* pEncoder, const SStreamRecoverFinishReq* pReq);
int32_t tDecodeStreamRecoverFinishReq(SDecoder* pDecoder, SStreamRecoverFinishReq* pReq);
typedef struct {
int64_t streamId;
......@@ -509,11 +531,11 @@ typedef struct {
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
} SStreamRecoverDownstreamRsp;
int32_t tEncodeSStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeSStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq);
int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq);
int32_t tEncodeSStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeSStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
int32_t tDecodeSStreamTaskRecoverReq(SDecoder* pDecoder, SStreamRecoverDownstreamReq* pReq);
......@@ -525,9 +547,11 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
......@@ -542,30 +566,44 @@ int32_t streamSchedExec(SStreamTask* pTask);
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history
int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version);
int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version);
void streamPrepareNdoCheckDownstream(SStreamTask* pTask);
int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask);
int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
int32_t streamCheckHistoryTaskDownstrem(SStreamTask* pTask);
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated);
bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask);
bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask);
int32_t streamTaskRecoverSetAllStepFinished(SStreamTask* pTask);
// common
int32_t streamSetParamForRecover(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
int32_t streamSetParamForScanHistoryData(SStreamTask* pTask);
int32_t streamRestoreParam(SStreamTask* pTask);
int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status);
// source level
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, int64_t ver);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask);
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
// int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
// agg level
int32_t streamAggRecoverPrepare(SStreamTask* pTask);
// int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId);
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t taskId, int32_t childId);
void streamMetaInit();
void streamMetaCleanup();
......@@ -591,6 +629,9 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointRsp* pRsp);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif
......
......@@ -28,11 +28,10 @@ extern "C" {
#endif
typedef struct SStreamFileState SStreamFileState;
typedef SList SStreamSnapshot;
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark);
GetTsFun fp, void* pFile, TSKEY delMark, const char* id);
void streamFileStateDestroy(SStreamFileState* pFileState);
void streamFileStateClear(SStreamFileState* pFileState);
bool needClearDiskBuff(SStreamFileState* pFileState);
......@@ -50,6 +49,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState);
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list);
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark);
int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState);
void streamFileStateReloadInfo(SStreamFileState* pFileState, TSKEY ts);
#ifdef __cplusplus
}
......
......@@ -919,7 +919,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
tDeleteSMqAskEpRsp(&pEpRspWrapper->msg);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
......@@ -932,7 +932,7 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) {
taosMemoryFreeClear(pRsp->pEpset);
taosMemoryFree(pRsp->metaRsp.metaRsp);
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
} else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pRsp = (SMqPollRspWrapper*)rspWrapper;
taosMemoryFreeClear(pRsp->pEpset);
......@@ -1407,7 +1407,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_RSP) {
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeMqDataRsp(&decoder, &pRspWrapper->dataRsp);
......@@ -1424,7 +1424,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeMqMetaRsp(&decoder, &pRspWrapper->metaRsp);
tDecoderClear(&decoder);
memcpy(&pRspWrapper->metaRsp, pMsg->pData, sizeof(SMqRspHead));
} else if (rspType == TMQ_MSG_TYPE__TAOSX_RSP) {
} else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
......@@ -1884,7 +1884,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno));
return NULL;
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) {
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
......@@ -1984,7 +1984,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__TAOSX_RSP) {
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
......@@ -2026,7 +2026,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
void* pRsp = NULL;
int64_t numOfRows = 0;
if (pollRspWrapper->taosxRsp.createTableNum == 0) {
pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
tscError("consumer:0x%" PRIx64" createTableNum should > 0 if rsp type is data_meta", tmq->consumerId);
} else {
pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
}
......
......@@ -160,7 +160,7 @@ static const SSysDbTableSchema streamSchema[] = {
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "task_id", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
......
此差异已折叠。
......@@ -111,7 +111,7 @@ int32_t tmqMaxTopicNum = 20;
int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false;
bool tsEnableQueryHb = true;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0;
......
......@@ -76,6 +76,9 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:
......
......@@ -746,9 +746,10 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RECOVER_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -92,7 +92,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) code = terrno;
dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType),
......
......@@ -647,6 +647,14 @@ typedef struct {
// SMqSubActionLogEntry* pLogEntry;
} SMqRebOutputObj;
typedef struct SStreamConf {
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
} SStreamConf;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
// ctl
......@@ -660,12 +668,7 @@ typedef struct {
// info
int64_t uid;
int8_t status;
// config
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
......@@ -675,14 +678,18 @@ typedef struct {
int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* tasks; // SArray<SArray<SStreamTask>>
SArray* pHTasksList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
......
......@@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream);
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream, int64_t nextWindowSkey);
#ifdef __cplusplus
}
......
......@@ -30,11 +30,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->watermark) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.trigger) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->conf.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.triggerParam) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->conf.watermark) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->sourceDbUid) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->targetDbUid) < 0) return -1;
......@@ -97,11 +97,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->watermark) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.trigger) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->conf.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.triggerParam) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->conf.watermark) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->sourceDbUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->targetDbUid) < 0) return -1;
......@@ -154,18 +154,10 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj, int32_t sver) {
return 0;
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
int32_t sz = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
static void* freeStreamTasks(SArray* pTaskLevel) {
int32_t numOfLevel = taosArrayGetSize(pTaskLevel);
for (int32_t i = 0; i < numOfLevel; i++) {
SArray *pLevel = taosArrayGetP(pTaskLevel, i);
int32_t taskSz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < taskSz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
......@@ -175,7 +167,20 @@ void tFreeStreamObj(SStreamObj *pStream) {
taosArrayDestroy(pLevel);
}
taosArrayDestroy(pStream->tasks);
return taosArrayDestroy(pTaskLevel);
}
void tFreeStreamObj(SStreamObj *pStream) {
taosMemoryFree(pStream->sql);
taosMemoryFree(pStream->ast);
taosMemoryFree(pStream->physicalPlan);
if (pStream->outputSchema.nCols || pStream->outputSchema.pSchema) {
taosMemoryFree(pStream->outputSchema.pSchema);
}
pStream->tasks = freeStreamTasks(pStream->tasks);
pStream->pHTasksList = freeStreamTasks(pStream->pHTasksList);
// tagSchema.pSchema
if (pStream->tagSchema.nCols > 0) {
......
......@@ -367,10 +367,10 @@ void dumpStream(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "smaId", i642str(pObj->smaId));
tjsonAddStringToObject(item, "uid", i642str(pObj->uid));
tjsonAddStringToObject(item, "status", i642str(pObj->status));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->watermark));
tjsonAddStringToObject(item, "igExpired", i642str(pObj->conf.igExpired));
tjsonAddStringToObject(item, "trigger", i642str(pObj->conf.trigger));
tjsonAddStringToObject(item, "triggerParam", i642str(pObj->conf.triggerParam));
tjsonAddStringToObject(item, "watermark", i642str(pObj->conf.watermark));
tjsonAddStringToObject(item, "sourceDbUid", i642str(pObj->sourceDbUid));
tjsonAddStringToObject(item, "targetDbUid", i642str(pObj->targetDbUid));
tjsonAddStringToObject(item, "sourceDb", mndGetDbStr(pObj->sourceDb));
......
......@@ -542,32 +542,32 @@ int32_t mndRetrieveTagIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, i
STR_TO_VARSTR(n3, (char *)tNameGetTableName(&stbName));
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n1, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n1, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n2, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n2, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)n3, false);
colDataSetVal(pColInfo, numOfRows, (const char *)n3, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&invalid, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&invalid, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pIdx->createdTime, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pIdx->createdTime, false);
char col[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(col, (char *)pIdx->colName);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"tag_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pIdx);
......
......@@ -842,7 +842,7 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
}
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, curRowIndex, subStatus, false);
colDataSetVal(pColInfo, curRowIndex, subStatus, (varDataLen(subStatus) == 0) ? true : false);
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(sql, pQuery->sql);
......
......@@ -555,20 +555,20 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj.version = 1;
streamObj.sql = taosStrdup(pCreate->sql);
streamObj.smaId = smaObj.uid;
streamObj.watermark = pCreate->watermark;
streamObj.conf.watermark = pCreate->watermark;
streamObj.deleteMark = pCreate->deleteMark;
streamObj.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.triggerParam = pCreate->maxDelay;
streamObj.conf.fillHistory = STREAM_FILL_HISTORY_ON;
streamObj.conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE;
streamObj.conf.triggerParam = pCreate->maxDelay;
streamObj.ast = taosStrdup(smaObj.ast);
// check the maxDelay
if (streamObj.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
if (streamObj.conf.triggerParam < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval = convertTimeFromPrecisionToUnit(pCreate->interval, pDb->cfg.precision, TIME_UNIT_MILLISECOND);
streamObj.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
streamObj.conf.triggerParam = msInterval > TSDB_MIN_ROLLUP_MAX_DELAY ? msInterval : TSDB_MIN_ROLLUP_MAX_DELAY;
}
if (streamObj.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
if (streamObj.conf.triggerParam > TSDB_MAX_ROLLUP_MAX_DELAY) {
streamObj.conf.triggerParam = TSDB_MAX_ROLLUP_MAX_DELAY;
}
if (mndAllocSmaVgroup(pMnode, pDb, &streamObj.fixedSinkVg) != 0) {
......@@ -597,8 +597,8 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = streamObj.trigger,
.watermark = streamObj.watermark,
.triggerType = streamObj.conf.trigger,
.watermark = streamObj.conf.watermark,
.deleteMark = streamObj.deleteMark,
};
......@@ -633,7 +633,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (mndSetCreateSmaVgroupCommitLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetUpdateSmaStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoActions(pMnode, pTrans, pDb, &streamObj.fixedSinkVg, &smaObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj) != 0) goto _OVER;
if (mndScheduleStream(pMnode, &streamObj, 1685959190000) != 0) goto _OVER;
if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
......@@ -1278,13 +1278,13 @@ static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
STR_TO_VARSTR(col, (char *)"");
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)col, false);
colDataSetVal(pColInfo, numOfRows, (const char *)col, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char tag[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(tag, (char *)"sma_index");
colDataAppend(pColInfo, numOfRows, (const char *)tag, false);
colDataSetVal(pColInfo, numOfRows, (const char *)tag, false);
numOfRows++;
sdbRelease(pSdb, pSma);
......
......@@ -239,7 +239,7 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->trigger;
int8_t trigger = pStream->conf.trigger;
if (trigger == STREAM_TRIGGER_AT_ONCE) {
strcpy(dst, "at once");
} else if (trigger == STREAM_TRIGGER_WINDOW_CLOSE) {
......@@ -299,13 +299,18 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
pObj->smaId = 0;
pObj->uid = mndGenerateUid(pObj->name, strlen(pObj->name));
char p[TSDB_STREAM_FNAME_LEN + 32] = {0};
snprintf(p, tListLen(p), "%s_%s", pObj->name, "fillhistory");
pObj->hTaskUid = mndGenerateUid(pObj->name, strlen(pObj->name));
pObj->status = 0;
pObj->igExpired = pCreate->igExpired;
pObj->trigger = pCreate->triggerType;
pObj->triggerParam = pCreate->maxDelay;
pObj->watermark = pCreate->watermark;
pObj->fillHistory = pCreate->fillHistory;
pObj->conf.igExpired = pCreate->igExpired;
pObj->conf.trigger = pCreate->triggerType;
pObj->conf.triggerParam = pCreate->maxDelay;
pObj->conf.watermark = pCreate->watermark;
pObj->conf.fillHistory = pCreate->fillHistory;
pObj->deleteMark = pCreate->deleteMark;
pObj->igCheckUpdate = pCreate->igUpdate;
......@@ -387,9 +392,9 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj,
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
.triggerType = pObj->trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->trigger,
.watermark = pObj->watermark,
.igExpired = pObj->igExpired,
.triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger,
.watermark = pObj->conf.watermark,
.igExpired = pObj->conf.igExpired,
.deleteMark = pObj->deleteMark,
.igCheckUpdate = pObj->igCheckUpdate,
};
......@@ -428,30 +433,37 @@ int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) {
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
tEncodeStreamTask(&encoder, pTask);
int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tEncoderClear(&encoder);
void *buf = taosMemoryCalloc(1, tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead *)buf)->vgId = htonl(pTask->nodeId);
((SMsgHead *)buf)->vgId = htonl(pTask->info.nodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, size);
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
STransAction action = {0};
action.mTraceId = pTrans->mTraceId;
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = buf;
action.contLen = tlen;
action.msgType = TDMT_STREAM_TASK_DEPLOY;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
return -1;
}
return 0;
}
......@@ -459,14 +471,33 @@ int32_t mndPersistStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
}
}
// persistent stream task for already stored ts data
if (pStream->conf.fillHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->pHTasksList, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
if (mndPersistTaskDeployReq(pTrans, pTask) < 0) {
return -1;
}
}
}
}
return 0;
}
......@@ -474,11 +505,13 @@ int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (mndPersistStreamTasks(pMnode, pTrans, pStream) < 0) {
return -1;
}
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
......@@ -490,6 +523,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
mndTransDrop(pTrans);
return -1;
}
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
return 0;
}
......@@ -603,16 +637,17 @@ _OVER:
static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
// vnode
/*if (pTask->nodeId > 0) {*/
/*if (pTask->info.nodeId > 0) {*/
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVDropStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_DROP;
......@@ -732,6 +767,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
}
mInfo("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb);
......@@ -748,7 +784,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
// schedule stream task for stream obj
if (mndScheduleStream(pMnode, &streamObj) < 0) {
if (mndScheduleStream(pMnode, &streamObj, createStreamReq.lastTs) < 0) {
mError("stream:%s, failed to schedule since %s", createStreamReq.name, terrstr());
mndTransDrop(pTrans);
goto _OVER;
......@@ -834,7 +870,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMStreamDoCheckpointMsg *pMsg) {
SStreamCheckpointSourceReq req = {0};
req.checkpointId = pMsg->checkpointId;
req.nodeId = pTask->nodeId;
req.nodeId = pTask->info.nodeId;
req.expireTime = -1;
req.streamId = pTask->streamId;
req.taskId = pTask->taskId;
......@@ -863,7 +899,7 @@ static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, con
SMsgHead *pMsgHead = (SMsgHead *)buf;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pTask->nodeId);
pMsgHead->vgId = htonl(pTask->info.nodeId);
tEncoderClear(&encoder);
......@@ -902,12 +938,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t sz = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
/*A(pTask->nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
/*A(pTask->info.nodeId > 0);*/
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj == NULL) {
taosRUnLockLatch(&pStream->lock);
mndReleaseStream(pMnode, pStream);
......@@ -965,8 +1001,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
/*SDbObj *pDb = NULL;*/
/*SUserObj *pUser = NULL;*/
SMDropStreamReq dropReq = {0};
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
......@@ -1157,7 +1191,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->watermark, false);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStream->conf.watermark, false);
char trigger[20 + VARSTR_HEADER_SIZE] = {0};
char trigger2[20] = {0};
......@@ -1187,12 +1221,16 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
if (pShow->pIter == NULL) {
break;
}
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
......@@ -1202,10 +1240,12 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
......@@ -1215,18 +1255,25 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pTask->id.taskId, false);
char idstr[128] = {0};
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
idstr[2] = '0';
idstr[3] = 'x';
varDataSetLen(idstr, len + 2);
colDataSetVal(pColInfo, numOfRows, idstr, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->nodeId > 0) {
if (pTask->info.nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
......@@ -1235,30 +1282,50 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int64_t nodeId = TMAX(pTask->nodeId, 0);
int64_t nodeId = TMAX(pTask->info.nodeId, 0);
colDataSetVal(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
strcpy(status, "normal");
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
int8_t taskStatus = atomic_load_8(&pTask->status.taskStatus);
if (taskStatus == TASK_STATUS__NORMAL) {
memcpy(varDataVal(status), "normal", 6);
varDataSetLen(status, 6);
} else if (taskStatus == TASK_STATUS__DROPPING) {
memcpy(varDataVal(status), "dropping", 8);
varDataSetLen(status, 8);
} else if (taskStatus == TASK_STATUS__FAIL) {
memcpy(varDataVal(status), "fail", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__STOP) {
memcpy(varDataVal(status), "stop", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__SCAN_HISTORY) {
memcpy(varDataVal(status), "history", 7);
varDataSetLen(status, 7);
} else if (taskStatus == TASK_STATUS__HALT) {
memcpy(varDataVal(status), "halt", 4);
varDataSetLen(status, 4);
} else if (taskStatus == TASK_STATUS__PAUSE) {
memcpy(varDataVal(status), "pause", 5);
varDataSetLen(status, 5);
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&status, false);
......@@ -1287,10 +1354,10 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVPauseStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_PAUSE;
......@@ -1301,21 +1368,36 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t size = taosArrayGetSize(pStream->tasks);
int32_t mndPauseAllStreamTaskImpl(STrans *pTrans, SArray* tasks) {
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
SArray *pTasks = taosArrayGetP(tasks, i);
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndPauseStreamTask(pTrans, pTask) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__PAUSE);
}
}
}
return 0;
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
int32_t code = mndPauseAllStreamTaskImpl(pTrans, pStream->tasks);
if (code != 0) {
return code;
}
// pStream->pHTasksList is null
// code = mndPauseAllStreamTaskImpl(pTrans, pStream->pHTasksList);
return code;
}
static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, int8_t status) {
SStreamObj streamObj = {0};
memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN);
......@@ -1355,6 +1437,10 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
}
if (pStream->status == STREAM_STATUS__PAUSE) {
return 0;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......@@ -1410,11 +1496,11 @@ static int32_t mndResumeStreamTask(STrans *pTrans, SStreamTask *pTask, int8_t ig
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->nodeId);
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->igUntreated = igUntreated;
STransAction action = {0};
memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet));
memcpy(&action.epSet, &pTask->info.epSet, sizeof(SEpSet));
action.pCont = pReq;
action.contLen = sizeof(SVResumeStreamTaskReq);
action.msgType = TDMT_STREAM_TASK_RESUME;
......@@ -1432,11 +1518,16 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
int32_t sz = taosArrayGetSize(pTasks);
for (int32_t j = 0; j < sz; j++) {
SStreamTask *pTask = taosArrayGetP(pTasks, j);
if (pTask->taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
if (pTask->info.taskLevel != TASK_LEVEL__SINK && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) {
return -1;
}
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__PAUSE) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
}
}
}
// pStream->pHTasksList is null
return 0;
}
......@@ -1463,6 +1554,10 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
}
if (pStream->status != STREAM_STATUS__PAUSE) {
return 0;
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
......
......@@ -875,7 +875,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
// if (pDb == NULL || pDb->compactStartTime <= 0) {
// colDataSetNULL(pColInfo, numOfRows);
// } else {
// colDataAppend(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false);
// colDataSetVal(pColInfo, numOfRows, (const char *)&pDb->compactStartTime, false);
// }
numOfRows++;
......
......@@ -52,23 +52,21 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = {
.code = code,
.info = pMsg->info,
};
SRpcMsg rsp = { .code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->childEpInfo) != 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamEpInfoList) != 0);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->inputQueue = streamQueueOpen(0);
pTask->outputQueue = streamQueueOpen(0);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputQueue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) {
return -1;
......@@ -85,14 +83,18 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
return -1;
}
int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState };
int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamEpInfoList);
SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory };
initStreamStateAPI(&handle.api);
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor);
streamSetupTrigger(pTask);
streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.version, pTask->info.selfChildId, pTask->info.taskLevel);
return 0;
}
......@@ -149,9 +151,10 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
ASSERT(pTask->taskLevel == TASK_LEVEL__AGG);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
// 2.save task
taosWLockLatch(&pSnode->pMeta->lock);
......@@ -161,19 +164,20 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
return -1;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
taosWUnLockLatch(&pSnode->pMeta->lock);
// 3.go through recover steps to fill history
if (pTask->fillHistory) {
streamSetParamForRecover(pTask);
streamAggRecoverPrepare(pTask);
}
streamPrepareNdoCheckDownstream(pTask);
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
return 0;
}
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
qDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
streamMetaRemoveTask(pSnode->pMeta, pReq->taskId);
return 0;
}
......@@ -255,13 +259,15 @@ int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
}
int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_DEPLOY:
case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len);
}
case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pReq, len);
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}
......@@ -277,7 +283,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
tDecodeSStreamRecoverFinishReq(&decoder, &req);
tDecodeStreamRecoverFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
......@@ -286,7 +292,7 @@ int32_t sndProcessTaskRecoverFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
return -1;
}
// do process request
if (streamProcessRecoverFinishReq(pTask, req.childId) < 0) {
if (streamProcessRecoverFinishReq(pTask, req.taskId, req.childId) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}
......@@ -300,6 +306,102 @@ int32_t sndProcessTaskRecoverFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
return 0;
}
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
} else {
rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
qError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
return -1;
}
void *buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t *)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
qDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.upstreamTaskId);
if (pTask == NULL) {
qError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId);
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return code;
}
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN:
......@@ -312,10 +414,14 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskRetrieveReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH:
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskRecoverFinishReq(pSnode, pMsg);
case TDMT_STREAM_RECOVER_FINISH_RSP:
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK:
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
default:
ASSERT(0);
}
......
......@@ -101,6 +101,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy= streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initFunctionStateStore(SFunctionStateStore* pStore) {
......
......@@ -96,6 +96,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
......@@ -126,8 +127,6 @@ int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey,
int32_t metaPutTbGroupToCache(void* pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
int32_t payloadLen);
int64_t metaGetTbNum(SMeta *pMeta);
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables);
// tsdb
......@@ -256,7 +255,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext **ctxRet);
int32_t getTableInfoFromSnapshot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext *ctx);
......
......@@ -45,27 +45,10 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore;
// tqPush
// typedef struct {
// // msg info
// int64_t consumerId;
// int64_t reqOffset;
// int64_t processedVer;
// int32_t epoch;
// // rpc info
// int64_t reqId;
// SRpcHandleInfo rpcInfo;
// tmr_h timerId;
// int8_t tmrStopped;
// // exec
// int8_t inputStatus;
// int8_t execStatus;
// SStreamQueue inputQ;
// SRWLatch lock;
// } STqPushHandle;
#define EXTRACT_DATA_FROM_WAL_ID (-1)
#define STREAM_TASK_STATUS_CHECK_ID (-2)
// tqExec
typedef struct {
char* qmsg; // SubPlanToString
} STqExecCol;
......@@ -184,10 +167,10 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq);
int32_t tqStreamTasksStatusCheck(STQ* pTq);
// tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
......
......@@ -67,7 +67,6 @@ typedef struct STsdbReadSnap STsdbReadSnap;
typedef struct SBlockInfo SBlockInfo;
typedef struct SSmaInfo SSmaInfo;
typedef struct SBlockCol SBlockCol;
typedef struct SVersionRange SVersionRange;
typedef struct SLDataIter SLDataIter;
typedef struct SDiskCol SDiskCol;
typedef struct SDiskData SDiskData;
......@@ -390,11 +389,6 @@ struct TSDBKEY {
TSKEY ts;
};
struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
};
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
......
......@@ -178,7 +178,8 @@ SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta);
void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
int64_t metaGetTbNum(SMeta *pMeta);
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
......@@ -217,6 +218,7 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
......@@ -238,14 +240,14 @@ int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLe
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
......@@ -17,13 +17,13 @@
#include "osMemory.h"
#include "tencode.h"
void _metaReaderInit(SMetaReader *pReader, void *pVnode, int32_t flags, SStoreMeta *pAPI) {
SMeta *pMeta = ((SVnode *)pVnode)->pMeta;
metaReaderInit(pReader, pMeta, flags);
void _metaReaderInit(SMetaReader* pReader, void* pVnode, int32_t flags, SStoreMeta* pAPI) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
metaReaderDoInit(pReader, pMeta, flags);
pReader->pAPI = pAPI;
}
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
void metaReaderDoInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
memset(pReader, 0, sizeof(*pReader));
pReader->pMeta = pMeta;
pReader->flags = flags;
......@@ -143,7 +143,7 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name) {
int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -159,7 +159,7 @@ int metaGetTableNameByUid(void *pVnode, uint64_t uid, char *tbName) {
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaReaderDoInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
metaReaderClear(&mr);
......@@ -174,7 +174,7 @@ int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName) {
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
SMetaReader *pReader = &mr;
......@@ -195,7 +195,7 @@ int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid) {
int metaGetTableTypeByName(void *pVnode, char *tbName, ETableType *tbType) {
int code = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, ((SVnode *)pVnode)->pMeta, 0);
metaReaderDoInit(&mr, ((SVnode*)pVnode)->pMeta, 0);
code = metaGetTableEntryByName(&mr, tbName);
if (code == 0) *tbType = mr.me.type;
......@@ -215,7 +215,7 @@ int metaReadNext(SMetaReader *pReader) {
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) {
int code = -1;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
metaReaderDoInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
goto _exit;
......@@ -244,9 +244,7 @@ SMTbCursor *metaOpenTbCursor(void *pVnode) {
return NULL;
}
SVnode *pVnodeObj = pVnode;
// metaReaderInit(&pTbCur->mr, pVnodeObj->pMeta, 0);
SVnode* pVnodeObj = pVnode;
// tdbTbcMoveToFirst((TBC *)pTbCur->pDbc);
pTbCur->pMeta = pVnodeObj->pMeta;
pTbCur->paused = 1;
......@@ -277,7 +275,7 @@ void metaPauseTbCursor(SMTbCursor *pTbCur) {
}
void metaResumeTbCursor(SMTbCursor *pTbCur, int8_t first) {
if (pTbCur->paused) {
metaReaderInit(&pTbCur->mr, pTbCur->pMeta, 0);
metaReaderDoInit(&pTbCur->mr, pTbCur->pMeta, 0);
tdbTbcOpen(((SMeta *)pTbCur->pMeta)->pUidIdx, (TBC **)&pTbCur->pDbc, NULL);
......@@ -784,7 +782,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
int64_t smaId;
int smaIdx = 0;
STSma *pTSma = NULL;
......@@ -839,7 +837,7 @@ _err:
STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) {
STSma *pTSma = NULL;
SMetaReader mr = {0};
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUid(&mr, indexUid) < 0) {
metaWarn("vgId:%d, failed to get table entry for smaId:%" PRIi64, TD_VID(pMeta->pVnode), indexUid);
metaReaderClear(&mr);
......
......@@ -37,7 +37,7 @@ int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
// validate req
// save smaIndex
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
#if 1
terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
......
......@@ -260,7 +260,7 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo)
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
}
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, int8_t withMeta,
SSnapContext** ctxRet) {
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
if (ctx == NULL) return -1;
......@@ -476,7 +476,7 @@ int32_t getTableInfoFromSnapshot(SSnapContext* ctx, void** pBuf, int32_t* contLe
if (ctx->index >= taosArrayGetSize(ctx->idList)) {
metaDebug("tmqsnap get meta end");
ctx->index = 0;
ctx->queryMeta = false; // change to get data
ctx->queryMeta = 0; // change to get data
return 0;
}
......
......@@ -709,7 +709,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
}
// validate req
metaReaderInit(&mr, pMeta, 0);
metaReaderDoInit(&mr, pMeta, 0);
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
if (pReq->type == TSDB_CHILD_TABLE && pReq->ctb.suid != mr.me.ctbEntry.suid) {
terrno = TSDB_CODE_TDB_TABLE_IN_OTHER_STABLE;
......
......@@ -896,7 +896,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid);
if (metaReaderGetTableEntryByUidCache(&mr, pInfo->suid) < 0) {
code = terrno;
......@@ -1116,7 +1116,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
}
int64_t nRsmaTables = 0;
metaReaderInit(&mr, SMA_META(pSma), 0);
metaReaderDoInit(&mr, SMA_META(pSma), 0);
if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
......
此差异已折叠。
......@@ -114,7 +114,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
}
SMetaReader mr = {0};
metaReaderInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
metaReaderDoInit(&mr, pHandle->execHandle.pTqReader->pVnodeMeta, 0);
if (metaGetTableEntryByName(&mr, req.tbName) < 0) {
metaReaderClear(&mr);
......@@ -216,9 +216,9 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
code = 0;
goto END;
} else {
if (pHandle->fetchMeta) {
if (pHandle->fetchMeta != WITH_DATA) {
SWalCont* pHead = &((*ppCkHead)->head);
if (IS_META_MSG(pHead->msgType)) {
if (IS_META_MSG(pHead->msgType) && !(pHead->msgType == TDMT_VND_DELETE && pHandle->fetchMeta == ONLY_META)) {
code = walFetchBody(pHandle->pWalReader, ppCkHead);
if (code < 0) {
*fetchOffset = offset;
......@@ -1109,7 +1109,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
......
......@@ -16,6 +16,7 @@
#include "tq.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId);
// this function should be executed by stream threads.
// extract submit block from WAL, and add them into the input queue for the sources tasks.
......@@ -57,7 +58,111 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0;
}
static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
int32_t tqStreamTasksStatusCheck(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all (%d) stream tasks downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
int32_t* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, *pTaskId);
if (pTask == NULL) {
continue;
}
streamTaskCheckDownstreamTasks(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return 0;
}
int32_t tqCheckStreamStatus(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
pMeta->walScanCounter += 1;
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pMeta->lock);
return -1;
}
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
// seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
if (pTask->chkInfo.currentVer < firstVer) {
......@@ -102,7 +207,7 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
*pScanIdle = true;
bool noNewDataInWal = true;
bool noDataInWal = true;
int32_t vgId = pStreamMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
......@@ -129,15 +234,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
}
int32_t status = pTask->status.taskStatus;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel);
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
......@@ -157,39 +260,47 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue);
// append the data for the stream
SStreamQueueItem* pItem = NULL;
code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr);
if (code != TSDB_CODE_SUCCESS) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
// delete ignore
if (pItem == NULL) {
if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
noNewDataInWal = false;
if (pItem != NULL) {
noDataInWal = false;
code = tAppendDataToInputQueue(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, too many in inputQ, ver:%" PRId64, pTask->id.idStr,
pTask->chkInfo.currentVer);
}
}
code = tqAddInputBlockNLaunchTask(pTask, pItem);
if (code == TSDB_CODE_SUCCESS) {
pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr,
pTask->chkInfo.currentVer);
} else {
tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) {
code = streamSchedExec(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pStreamMeta, pTask);
return -1;
}
}
streamMetaReleaseTask(pStreamMeta, pTask);
}
// all wal are checked, and no new data available in wal.
if (noNewDataInWal) {
if (noDataInWal) {
*pScanIdle = true;
}
taosArrayDestroy(pTaskList);
return 0;
}
......@@ -48,7 +48,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
......@@ -215,19 +215,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_table;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
goto loop_table;
}
}
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
......@@ -237,7 +233,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
goto loop_table;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
......@@ -246,7 +242,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
goto loop_table;
}
taosArrayPush(pRsp->createTableLen, &len);
......@@ -255,6 +251,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
goto loop_table;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
......@@ -265,6 +264,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
continue;
loop_table:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pTqReader;
......@@ -274,19 +279,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL;
if (tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) goto loop_db;
}
if (pRsp->withTbName) {
int64_t uid = pExec->pTqReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
continue;
goto loop_db;
}
}
if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) {
if (pRsp->createTableNum == 0) {
pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t));
pRsp->createTableReq = taosArrayInit(0, sizeof(void*));
......@@ -296,7 +297,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
uint32_t len = 0;
tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code);
if (TSDB_CODE_SUCCESS != code) {
continue;
goto loop_db;
}
void* createReq = taosMemoryCalloc(1, len);
SEncoder encoder = {0};
......@@ -305,7 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (code < 0) {
tEncoderClear(&encoder);
taosMemoryFree(createReq);
continue;
goto loop_db;
}
taosArrayPush(pRsp->createTableLen, &len);
......@@ -314,6 +315,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder);
}
if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL){
goto loop_db;
}
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
......@@ -324,6 +328,12 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
taosArrayPush(pRsp->blockSchema, &pSW);
pRsp->blockNum++;
}
continue;
loop_db:
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
pSchemas = taosArrayInit(0, sizeof(void*));
}
}
taosArrayDestroy(pBlocks);
......
......@@ -309,7 +309,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tbData.uid = pTableSinkInfo->uid;
} else {
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
metaReaderDoInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
taosMemoryFree(pTableSinkInfo);
......@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......
......@@ -20,12 +20,6 @@
static int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq,
const SMqMetaRsp* pRsp, int32_t vgId);
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);
return taosStrdup(buf);
}
int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem) {
int32_t code = tAppendDataToInputQueue(pTask, pQueueItem);
if (code < 0) {
......@@ -123,28 +117,17 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
}
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
walRefLastVer(pTq->pVnode->pWal, pHandle->pRef);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true;
return code;
} else {
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
tqOffsetResetToLog(&taosxRsp.rspOffset, pHandle->pRef->refVer);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
tDeleteSTaosxRsp(&taosxRsp);
*pBlockReturned = true;
return code;
}
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest);
tqOffsetResetToLog(&dataRsp.rspOffset, pHandle->pRef->refVer);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, (latest) offset reset to %" PRId64, consumerId,
pHandle->subKey, vgId, dataRsp.rspOffset.version);
int32_t code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
tDeleteMqDataRsp(&dataRsp);
*pBlockReturned = true;
return code;
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64
" in vg %d, subkey %s, reset none failed",
......@@ -187,7 +170,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
}
}
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
end : {
char buf[TSDB_OFFSET_LEN] = {0};
......@@ -230,7 +213,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type,
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts);
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
*offset = taosxRsp.rspOffset;
......@@ -260,7 +243,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
......@@ -272,7 +255,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (pHead->msgType != TDMT_VND_SUBMIT) {
if (totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
}
......@@ -301,7 +284,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP, vgId);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;
} else {
fetchVer++;
......@@ -396,9 +379,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeSize(tEncodeMqDataRsp, pRsp, len, code);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
}
......@@ -420,9 +403,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
SEncoder encoder = {0};
tEncoderInit(&encoder, abuf, len);
if (type == TMQ_MSG_TYPE__POLL_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP || type == TMQ_MSG_TYPE__WALINFO_RSP) {
tEncodeMqDataRsp(&encoder, pRsp);
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
} else if (type == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
tEncodeSTaosxRsp(&encoder, (STaosxRsp*)pRsp);
}
......
......@@ -1610,7 +1610,7 @@ static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
tb_uid_t suid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
metaReaderDoInit(&mr, pTsdb->pVnode->pMeta, 0);
if (metaReaderGetTableEntryByUidCache(&mr, uid) < 0) {
metaReaderClear(&mr); // table not esist
return 0;
......
......@@ -562,7 +562,8 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
code = tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did);
TSDB_CHECK_CODE(code, lino, _exit);
wSet.diskId = did;
wSet.nSttF = 1;
}
......
......@@ -203,6 +203,7 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCommit = streamStateCommit;
pStore->streamStateDestroy = streamStateDestroy;
pStore->streamStateDeleteCheckPoint = streamStateDeleteCheckPoint;
pStore->streamStateReloadInfo = streamStateReloadInfo;
}
void initMetaReaderAPI(SStoreMetaReader* pMetaReader) {
......
......@@ -554,7 +554,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId);
} else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq);
tqCheckStreamStatus(pVnode->pTq);
}
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册