提交 c5e6f9eb 编写于 作者: K kailixu

Merge branch '3.0' into enh/TD-25528-3.0x

cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE ON)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
......
......@@ -52,7 +52,7 @@ CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
### Create a Stream
```sql
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s);
create stream current_stream trigger at_once into current_stream_output_stb as select _wstart as wstart, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);
```
### Write Data
......
......@@ -53,17 +53,17 @@ The related schemas and APIs in various languages are described as follows:
typedef void(tmq_commit_cb(tmq_t *tmq, int32_t code, void *param));
typedef enum tmq_conf_res_t {
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
TMQ_CONF_UNKNOWN = -2,
TMQ_CONF_INVALID = -1,
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end;
} tmq_topic_assignment;
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end; // The last version of wal + 1
} tmq_topic_assignment;
DLL_EXPORT tmq_conf_t *tmq_conf_new();
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
......@@ -82,21 +82,21 @@ The related schemas and APIs in various languages are described as follows:
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); //Commit the msg’s offset + 1
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); // Get current offset of the result
DLL_EXPORT const char *tmq_err2str(int32_t code);
```
The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below.
......
......@@ -4,7 +4,7 @@ sidebar_label: Access Control
description: This document describes how to manage users and permissions in TDengine.
---
This document describes how to manage permissions in TDengine.
User and Access control is a distingguished feature of TDengine enterprise edition. In this section, only the most fundamental functionalities of user and access control are demonstrated. To get the full knowledge of user and access control, please contact the TDengine team.
## Create a User
......
......@@ -378,7 +378,7 @@ In addition to writing data using the SQL method or the parameter binding API, w
- `TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)`
**Function description**
This interface writes the text data of the line protocol to TDengine.
- This interface writes the text data of the line protocol to TDengine.
**Parameter description**
- taos: database connection, established by the `taos_connect()` function.
......@@ -387,12 +387,13 @@ In addition to writing data using the SQL method or the parameter binding API, w
- protocol: the protocol type of the lines, used to identify the text data format.
- precision: precision string for the timestamp in the text data.
**return value**
TAOS_RES structure, application can get error message by using `taos_errstr()` and also error code by using `taos_errno()`.
**Return value**
- TAOS_RES structure, application can get error message by using `taos_errstr()` and also error code by using `taos_errno()`.
In some cases, the returned TAOS_RES is `NULL`, and it is still possible to call `taos_errno()` to safely get the error code information.
The returned TAOS_RES needs to be freed by the caller in order to avoid memory leaks.
**Description**
The protocol type is enumerated and contains the following three formats.
- TSDB_SML_LINE_PROTOCOL: InfluxDB line protocol (Line Protocol)
......@@ -427,3 +428,89 @@ In addition to writing data using the SQL method or the parameter binding API, w
- Within _raw interfaces represent data through the passed parameters lines and len. In order to solve the problem that the original interface data contains '\0' and is truncated. The totalRows pointer returns the number of parsed data rows.
- Within _ttl interfaces can pass the ttl parameter to control the ttl expiration time of the table.
- Within _reqid interfaces can track the entire call chain by passing the reqid parameter.
### Subscription API
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
tmq_topic_assignment defined as follows:
```c
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end;
} tmq_topic_assignment;
```
**Function description**
- tmq_get_topic_assignment get the current vgroup information of this consumer
**Parameter description**
- numOfAssignment:the num of vgroups assigned to this consumer
- assignment:the information of vgroups, needed to be freed by tmq_free_assignment
**Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
- get the committed offset
**Return value**
- the value of committed offset, -2147467247 means no committed value, Other values less than 0 indicate failure
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`
**Function description**
The commit interface is divided into two types, each with synchronous and asynchronous interfaces:
- The first type: based on message submission, submit the progress in the message. If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer: tmq_commit_sync/tmq_commit_async
- The second type: submit based on the offset of a Vgroup in a topic: tmq_commit_offset_sync/tmq_commit_offset_async
**Parameter description**
- msg:Message consumed, If the message passes NULL, submit the current progress of all vgroups consumed by the current consumer
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**Function description**
- Obtain the current consumption location, which is the next location of the data consumed
**Return value**
- the current consumption location, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**Function description**
- Set the offset position of the consumer in a Vgroup of a certain topic to start consumption
**Return value**
- zero success, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**Function description**
- Obtain the starting offset of the consumed data
**Parameter description**
- msg:Message consumed
**Return value**
- the starting offset of the consumed data, none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**Function description**
- Obtain a list of topics subscribed by consumers
**Parameter description**
- topics: a list of topics subscribed by consumers,need to be freed by tmq_list_destroy
**Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
\ No newline at end of file
......@@ -81,6 +81,14 @@ For example:
taos -h h1.taos.com -s "use db; show tables;"
```
## Export query results to a file
- You can use ">>" to export the query results to a file, the syntax is like `select * from table >> file`. If there is only file name without path, the file will be generated under the current working directory of TDegnine CLI.
## Import data from CSV file
- You can use `insert into table_name file 'fileName'` to import the data from the specified file into the specified table. For example, `insert into d0 file '/root/d0.csv';` means importing the data in file "/root/d0.csv" into table "d0". If there is only file name without path, that means the file is located under current working directory of TDengine CLI.
## TDengine CLI tips
- You can use the up and down keys to iterate the history of commands entered
......@@ -89,3 +97,5 @@ taos -h h1.taos.com -s "use db; show tables;"
- Execute `RESET QUERY CACHE` to clear the local cache of the table schema
- Execute SQL statements in batches. You can store a series of shell commands (ending with ;, one line for each SQL command) in a script file and execute the command `source <file-name>` in the TDengine CLI to execute all SQL commands in that file automatically
- Enter `q` to exit TDengine CLI
......@@ -467,21 +467,22 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- `TAOS_RES* taos_schemaless_insert(TAOS* taos, const char* lines[], int numLines, int protocol, int precision)`
**功能说明**
该接口将行协议的文本数据写入到 TDengine 中。
- 该接口将行协议的文本数据写入到 TDengine 中。
**参数说明**
taos: 数据库连接,通过 `taos_connect()` 函数建立的数据库连接。
lines:文本数据。满足解析格式要求的无模式文本字符串。
numLines:文本数据的行数,不能为 0 。
protocol: 行协议类型,用于标识文本数据格式。
precision:文本数据中的时间戳精度字符串。
- taos: 数据库连接,通过 `taos_connect()` 函数建立的数据库连接。
- lines:文本数据。满足解析格式要求的无模式文本字符串。
- numLines:文本数据的行数,不能为 0 。
- protocol: 行协议类型,用于标识文本数据格式。
- precision:文本数据中的时间戳精度字符串。
**返回值**
TAOS_RES 结构体,应用可以通过使用 `taos_errstr()` 获得错误信息,也可以使用 `taos_errno()` 获得错误码。
- TAOS_RES 结构体,应用可以通过使用 `taos_errstr()` 获得错误信息,也可以使用 `taos_errno()` 获得错误码。
在某些情况下,返回的 TAOS_RES 为 `NULL`,此时仍然可以调用 `taos_errno()` 来安全地获得错误码信息。
返回的 TAOS_RES 需要调用方来负责释放,否则会出现内存泄漏。
**说明**
协议类型是枚举类型,包含以下三种格式:
- TSDB_SML_LINE_PROTOCOL:InfluxDB 行协议(Line Protocol)
......@@ -515,3 +516,90 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- 带_raw的接口通过传递的参数lines指针和长度len来表示数据,为了解决原始接口数据包含'\0'而被截断的问题。totalRows指针返回解析出来的数据行数。
- 带_ttl的接口可以传递ttl参数来控制建表的ttl到期时间。
- 带_reqid的接口可以通过传递reqid参数来追踪整个的调用链。
### 数据订阅 API
- `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)`
- `void tmq_free_assignment(tmq_topic_assignment* pAssignment)`
tmq_topic_assignment结构体定义如下:
```c
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
int64_t begin;
int64_t end;
} tmq_topic_assignment;
```
**功能说明**
- tmq_get_topic_assignment 接口返回当前consumer分配的vgroup的信息,每个vgroup的信息包括vgId,wal的最大最小offset,以及当前消费到的offset。
**参数说明**
- numOfAssignment :分配给该consumer有效的vgroup个数。
- assignment :分配的信息,数据大小为numOfAssignment,需要通过 tmq_free_assignment 接口释放。
**返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息。
- `int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
- 获取当前 consumer 在某个 topic 和 vgroup上的 commit 位置。
**返回值**
- 当前commit的位置,-2147467247表示没有消费进度,其他小于0的值表示失败,错误码就是返回值
- `int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg)`
- `void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param)`
- `int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
- `void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param)`
**功能说明**
commit接口分为两种类型,每种类型有同步和异步接口:
- 第一种类型:根据消息提交,提交消息里的进度,如果消息传NULL,提交当前consumer所有消费的vgroup的当前进度 : tmq_commit_sync/tmq_commit_async
- 第二种类型:根据某个topic的某个vgroup的offset提交 : tmq_commit_offset_sync/tmq_commit_offset_async
**参数说明**
- msg:消费到的消息结构,如果msg传NULL,提交当前consumer所有消费的vgroup的当前进度
**返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId)`
**功能说明**
- 获取当前消费位置,为消费到的数据位置的下一个位置
**返回值**
- 消费位置,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset)`
**功能说明**
- 设置 consumer 在某个topic的某个vgroup的 offset位置,开始消费
**返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int64_t tmq_get_vgroup_offset(TAOS_RES* res)`
**功能说明**
获取 poll 消费到的数据的起始offset
**参数说明**
- msg:消费到的消息结构
**返回值**
- 消费到的offset,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
- `int32_t int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics)`
**功能说明**
获取消费者订阅的 topic 列表
**参数说明**
- topics: 获取的 topic 列表存储在这个结构中,接口内分配内存,需调用tmq_list_destroy释放
**返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
\ No newline at end of file
......@@ -249,3 +249,12 @@ T = 最新事件时间 - DELETE_MARK
- [unique](../function/#unique)
- [mode](../function/#mode)
## 暂停、恢复流计算
1.流计算暂停计算任务
PAUSE STREAM [IF EXISTS] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错;如果存在,则暂停流计算。指定了IF EXISTS,如果该stream不存在,则返回成功;如果存在,则暂停流计算
2.流计算恢复计算任务
RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
没有指定IF EXISTS,如果该stream不存在,则报错,如果存在,则恢复流计算;指定了IF EXISTS,如果stream不存在,则返回成功;如果存在,则恢复流计算。如果指定IGNORE UNTREATED,则恢复流计算时,忽略流计算暂停期间写入的数据。
......@@ -4,7 +4,7 @@ title: 权限管理
description: 企业版中才具有的权限管理功能
---
本节讲述如何在 TDengine 中进行权限管理的相关操作。
本节讲述如何在 TDengine 中进行权限管理的相关操作。权限管理是 TDengine 企业版的特有功能,本节只列举了一些基本的权限管理功能作为示例,更丰富的权限管理请联系 TDengine 销售或市场团队。
## 创建用户
......
......@@ -89,3 +89,11 @@ taos -h h1.taos.com -s "use db; show tables;"
- 执行 `RESET QUERY CACHE` 可清除本地表 Schema 的缓存
- 批量执行 SQL 语句。可以将一系列的 TDengine CLI 命令(以英文 ; 结尾,每个 SQL 语句为一行)按行存放在文件里,在 TDengine CLI 里执行命令 `source <file-name>` 自动执行该文件里所有的 SQL 语句
- 输入 `q``quit``exit` 回车,可以退出 TDengine CLI
## TDengine CLI 导出查询结果到文件中
- 可以使用符号 “>>” 导出查询结果到某个文件中,语法为: sql 查询语句 >> ‘输出文件名’; 输出文件如果不写路径的话,将输出至当前目录下。如 select * from d0 >> ‘/root/d0.csv’; 将把查询结果输出到 /root/d0.csv 中。
## TDengine CLI 导入文件中的数据到表中
- 可以使用 insert into table_name file '输入文件名',把上一步中导出的数据文件再导入到指定表中。如 insert into d0 file '/root/d0.csv'; 表示把上面导出的数据全部再导致至 d0 表中。
......@@ -292,14 +292,14 @@ DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq);
DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); //Commit the msg’s offset + 1
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,int32_t *numOfAssignment);
DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
......
......@@ -152,6 +152,8 @@ enum {
STREAM_INPUT__DATA_RETRIEVE,
STREAM_INPUT__GET_RES,
STREAM_INPUT__CHECKPOINT,
STREAM_INPUT__CHECKPOINT_TRIGGER,
STREAM_INPUT__TRANS_STATE,
STREAM_INPUT__REF_DATA_BLOCK,
STREAM_INPUT__DESTROY,
};
......@@ -168,7 +170,9 @@ typedef enum EStreamType {
STREAM_PULL_DATA,
STREAM_PULL_OVER,
STREAM_FILL_OVER,
STREAM_CHECKPOINT,
STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE,
} EStreamType;
#pragma pack(push, 1)
......
......@@ -241,7 +241,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf);
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** dumpBuf, const char* taskIdStr);
int32_t buildSubmitReqFromDataBlock(SSubmitReq2** pReq, const SSDataBlock* pDataBlocks, const STSchema* pTSchema, int64_t uid, int32_t vgId,
tb_uid_t suid);
......
......@@ -199,6 +199,7 @@ extern bool tsFilterScalarMode;
extern int32_t tsKeepTimeOffset;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
extern int32_t tsResolveFQDNRetryTime;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -231,6 +231,7 @@ typedef struct SField {
uint8_t type;
int8_t flags;
int32_t bytes;
char comment[TSDB_COL_COMMENT_LEN];
} SField;
typedef struct SRetention {
......@@ -309,6 +310,7 @@ struct SSchema {
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
char comment[TSDB_COL_COMMENT_LEN];
};
struct SSchema2 {
......@@ -2387,6 +2389,9 @@ typedef struct {
int8_t type;
int8_t flags;
int32_t bytes;
bool hasColComment;
char* colComment;
int32_t colCommentLen;
// TSDB_ALTER_TABLE_DROP_COLUMN
// TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES
int8_t colModType;
......
......@@ -254,7 +254,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
......
......@@ -16,105 +16,105 @@
#ifndef _TD_COMMON_TOKEN_H_
#define _TD_COMMON_TOKEN_H_
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
#define TK_ALL 4
#define TK_MINUS 5
#define TK_EXCEPT 6
#define TK_INTERSECT 7
#define TK_NK_BITAND 8
#define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12
#define TK_NK_MINUS 13
#define TK_NK_STAR 14
#define TK_NK_SLASH 15
#define TK_NK_REM 16
#define TK_NK_CONCAT 17
#define TK_CREATE 18
#define TK_ACCOUNT 19
#define TK_NK_ID 20
#define TK_PASS 21
#define TK_NK_STRING 22
#define TK_ALTER 23
#define TK_PPS 24
#define TK_TSERIES 25
#define TK_STORAGE 26
#define TK_STREAMS 27
#define TK_QTIME 28
#define TK_DBS 29
#define TK_USERS 30
#define TK_CONNS 31
#define TK_STATE 32
#define TK_USER 33
#define TK_ENABLE 34
#define TK_NK_INTEGER 35
#define TK_SYSINFO 36
#define TK_DROP 37
#define TK_GRANT 38
#define TK_ON 39
#define TK_TO 40
#define TK_REVOKE 41
#define TK_FROM 42
#define TK_SUBSCRIBE 43
#define TK_NK_COMMA 44
#define TK_READ 45
#define TK_WRITE 46
#define TK_NK_DOT 47
#define TK_WITH 48
#define TK_DNODE 49
#define TK_PORT 50
#define TK_DNODES 51
#define TK_RESTORE 52
#define TK_NK_IPTOKEN 53
#define TK_FORCE 54
#define TK_UNSAFE 55
#define TK_LOCAL 56
#define TK_QNODE 57
#define TK_BNODE 58
#define TK_SNODE 59
#define TK_MNODE 60
#define TK_VNODE 61
#define TK_DATABASE 62
#define TK_USE 63
#define TK_FLUSH 64
#define TK_TRIM 65
#define TK_COMPACT 66
#define TK_IF 67
#define TK_NOT 68
#define TK_EXISTS 69
#define TK_BUFFER 70
#define TK_CACHEMODEL 71
#define TK_CACHESIZE 72
#define TK_COMP 73
#define TK_DURATION 74
#define TK_NK_VARIABLE 75
#define TK_MAXROWS 76
#define TK_MINROWS 77
#define TK_KEEP 78
#define TK_PAGES 79
#define TK_PAGESIZE 80
#define TK_TSDB_PAGESIZE 81
#define TK_PRECISION 82
#define TK_REPLICA 83
#define TK_VGROUPS 84
#define TK_SINGLE_STABLE 85
#define TK_RETENTIONS 86
#define TK_SCHEMALESS 87
#define TK_WAL_LEVEL 88
#define TK_WAL_FSYNC_PERIOD 89
#define TK_WAL_RETENTION_PERIOD 90
#define TK_WAL_RETENTION_SIZE 91
#define TK_WAL_ROLL_PERIOD 92
#define TK_WAL_SEGMENT_SIZE 93
#define TK_STT_TRIGGER 94
#define TK_TABLE_PREFIX 95
#define TK_TABLE_SUFFIX 96
#define TK_NK_COLON 97
#define TK_MAX_SPEED 98
#define TK_START 99
#define TK_OR 1
#define TK_AND 2
#define TK_UNION 3
#define TK_ALL 4
#define TK_MINUS 5
#define TK_EXCEPT 6
#define TK_INTERSECT 7
#define TK_NK_BITAND 8
#define TK_NK_BITOR 9
#define TK_NK_LSHIFT 10
#define TK_NK_RSHIFT 11
#define TK_NK_PLUS 12
#define TK_NK_MINUS 13
#define TK_NK_STAR 14
#define TK_NK_SLASH 15
#define TK_NK_REM 16
#define TK_NK_CONCAT 17
#define TK_CREATE 18
#define TK_ACCOUNT 19
#define TK_NK_ID 20
#define TK_PASS 21
#define TK_NK_STRING 22
#define TK_ALTER 23
#define TK_PPS 24
#define TK_TSERIES 25
#define TK_STORAGE 26
#define TK_STREAMS 27
#define TK_QTIME 28
#define TK_DBS 29
#define TK_USERS 30
#define TK_CONNS 31
#define TK_STATE 32
#define TK_USER 33
#define TK_ENABLE 34
#define TK_NK_INTEGER 35
#define TK_SYSINFO 36
#define TK_DROP 37
#define TK_GRANT 38
#define TK_ON 39
#define TK_TO 40
#define TK_REVOKE 41
#define TK_FROM 42
#define TK_SUBSCRIBE 43
#define TK_NK_COMMA 44
#define TK_READ 45
#define TK_WRITE 46
#define TK_NK_DOT 47
#define TK_WITH 48
#define TK_DNODE 49
#define TK_PORT 50
#define TK_DNODES 51
#define TK_RESTORE 52
#define TK_NK_IPTOKEN 53
#define TK_FORCE 54
#define TK_UNSAFE 55
#define TK_LOCAL 56
#define TK_QNODE 57
#define TK_BNODE 58
#define TK_SNODE 59
#define TK_MNODE 60
#define TK_VNODE 61
#define TK_DATABASE 62
#define TK_USE 63
#define TK_FLUSH 64
#define TK_TRIM 65
#define TK_COMPACT 66
#define TK_IF 67
#define TK_NOT 68
#define TK_EXISTS 69
#define TK_BUFFER 70
#define TK_CACHEMODEL 71
#define TK_CACHESIZE 72
#define TK_COMP 73
#define TK_DURATION 74
#define TK_NK_VARIABLE 75
#define TK_MAXROWS 76
#define TK_MINROWS 77
#define TK_KEEP 78
#define TK_PAGES 79
#define TK_PAGESIZE 80
#define TK_TSDB_PAGESIZE 81
#define TK_PRECISION 82
#define TK_REPLICA 83
#define TK_VGROUPS 84
#define TK_SINGLE_STABLE 85
#define TK_RETENTIONS 86
#define TK_SCHEMALESS 87
#define TK_WAL_LEVEL 88
#define TK_WAL_FSYNC_PERIOD 89
#define TK_WAL_RETENTION_PERIOD 90
#define TK_WAL_RETENTION_SIZE 91
#define TK_WAL_ROLL_PERIOD 92
#define TK_WAL_SEGMENT_SIZE 93
#define TK_STT_TRIGGER 94
#define TK_TABLE_PREFIX 95
#define TK_TABLE_SUFFIX 96
#define TK_NK_COLON 97
#define TK_MAX_SPEED 98
#define TK_START 99
#define TK_TIMESTAMP 100
#define TK_END 101
#define TK_TABLE 102
......@@ -130,25 +130,25 @@
#define TK_NK_EQ 112
#define TK_USING 113
#define TK_TAGS 114
#define TK_BOOL 115
#define TK_TINYINT 116
#define TK_SMALLINT 117
#define TK_INT 118
#define TK_INTEGER 119
#define TK_BIGINT 120
#define TK_FLOAT 121
#define TK_DOUBLE 122
#define TK_BINARY 123
#define TK_NCHAR 124
#define TK_UNSIGNED 125
#define TK_JSON 126
#define TK_VARCHAR 127
#define TK_MEDIUMBLOB 128
#define TK_BLOB 129
#define TK_VARBINARY 130
#define TK_GEOMETRY 131
#define TK_DECIMAL 132
#define TK_COMMENT 133
#define TK_COMMENT 115
#define TK_BOOL 116
#define TK_TINYINT 117
#define TK_SMALLINT 118
#define TK_INT 119
#define TK_INTEGER 120
#define TK_BIGINT 121
#define TK_FLOAT 122
#define TK_DOUBLE 123
#define TK_BINARY 124
#define TK_NCHAR 125
#define TK_UNSIGNED 126
#define TK_JSON 127
#define TK_VARCHAR 128
#define TK_MEDIUMBLOB 129
#define TK_BLOB 130
#define TK_VARBINARY 131
#define TK_GEOMETRY 132
#define TK_DECIMAL 133
#define TK_MAX_DELAY 134
#define TK_WATERMARK 135
#define TK_ROLLUP 136
......
......@@ -157,6 +157,8 @@ typedef enum EFunctionType {
FUNCTION_TYPE_AVG_MERGE,
FUNCTION_TYPE_STDDEV_PARTIAL,
FUNCTION_TYPE_STDDEV_MERGE,
FUNCTION_TYPE_IRATE_PARTIAL,
FUNCTION_TYPE_IRATE_MERGE,
// geometry functions
FUNCTION_TYPE_GEOM_FROM_TEXT = 4250,
......
......@@ -23,10 +23,11 @@ extern "C" {
#include "query.h"
#include "querynodes.h"
#define DESCRIBE_RESULT_COLS 4
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COLS 5
#define DESCRIBE_RESULT_FIELD_LEN (TSDB_COL_NAME_LEN - 1 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_TYPE_LEN (20 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_NOTE_LEN (8 + VARSTR_HEADER_SIZE)
#define DESCRIBE_RESULT_COL_COMMENT_LEN (TSDB_COL_COMMENT_LEN)
#define SHOW_CREATE_DB_RESULT_COLS 2
#define SHOW_CREATE_DB_RESULT_FIELD1_LEN (TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE)
......@@ -155,7 +156,7 @@ typedef struct SColumnDefNode {
ENodeType type;
char colName[TSDB_COL_NAME_LEN];
SDataType dataType;
char comments[TSDB_TB_COMMENT_LEN];
char comments[TSDB_COL_COMMENT_LEN];
bool sma;
} SColumnDefNode;
......@@ -214,6 +215,7 @@ typedef struct SAlterTableStmt {
char newColName[TSDB_COL_NAME_LEN];
STableOptions* pOptions;
SDataType dataType;
char colComment[TSDB_COL_COMMENT_LEN];
SValueNode* pVal;
} SAlterTableStmt;
......
......@@ -122,6 +122,7 @@ typedef struct {
int8_t type;
int32_t srcVgId;
int32_t srcTaskId;
int32_t childId;
int64_t sourceVer;
int64_t reqId;
......@@ -251,6 +252,7 @@ typedef struct SStreamChildEpInfo {
int32_t nodeId;
int32_t childId;
int32_t taskId;
int8_t dataAllowed;
SEpSet epSet;
} SStreamChildEpInfo;
......@@ -272,6 +274,7 @@ typedef struct SStreamStatus {
int8_t schedStatus;
int8_t keepTaskStatus;
bool transferState;
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
} SStreamStatus;
......@@ -399,8 +402,9 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t type;
int32_t taskId;
int32_t dataSrcVgId;
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
......@@ -570,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
......@@ -579,6 +581,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
......@@ -589,7 +593,6 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
bool streamTaskShouldStop(const SStreamStatus* pStatus);
bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
......@@ -626,7 +629,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
// agg level
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask);
......
......@@ -230,6 +230,7 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_TB_COMMENT_LEN 1025
#define TSDB_COL_COMMENT_LEN 1025
#define TSDB_QUERY_ID_LEN 26
#define TSDB_TRANS_OPER_LEN 16
......
......@@ -1771,7 +1771,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
}
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
int32_t size = 2048 * 1024;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
......@@ -1780,9 +1780,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t rows = pDataBlock->info.rows;
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64 "|rows:%" PRId64
"|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
"%s===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);
if (len >= size - 1) return dumpBuf;
......
......@@ -240,6 +240,7 @@ int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool tsFilterScalarMode = false;
int32_t tsKeepTimeOffset = 0; // latency of data migration
int tsResolveFQDNRetryTime = 100; //seconds
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";
......@@ -628,6 +629,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "keepTimeOffset", tsKeepTimeOffset, 0, 23, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "resolveFQDNRetryTime", tsResolveFQDNRetryTime, 1, 10240, 0) != 0) return -1;
if (cfgAddString(pCfg, "s3Accesskey", tsS3AccessKey, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "s3Endpoint", tsS3Endpoint, CFG_SCOPE_SERVER) != 0) return -1;
......@@ -1031,6 +1033,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsKeepTimeOffset = cfgGetItem(pCfg, "keepTimeOffset")->i32;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32;
tsResolveFQDNRetryTime = cfgGetItem(pCfg, "resolveFQDNRetryTime")->i32;
GRANT_CFG_GET;
return 0;
......
......@@ -534,6 +534,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfTags; ++i) {
......@@ -542,6 +543,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI8(&encoder, pField->flags) < 0) return -1;
if (tEncodeI32(&encoder, pField->bytes) < 0) return -1;
if (tEncodeCStr(&encoder, pField->name) < 0) return -1;
if (tEncodeCStr(&encoder, pField->comment) < 0) return -1;
}
for (int32_t i = 0; i < pReq->numOfFuncs; ++i) {
......@@ -608,6 +610,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pColumns, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -620,6 +623,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI8(&decoder, &field.flags) < 0) return -1;
if (tDecodeI32(&decoder, &field.bytes) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.name) < 0) return -1;
if (tDecodeCStrTo(&decoder, field.comment) < 0) return -1;
if (taosArrayPush(pReq->pTags, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -2301,7 +2305,7 @@ int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp)
}
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
......@@ -3684,7 +3688,7 @@ static int32_t tDecodeSTableMetaRsp(SDecoder *pDecoder, STableMetaRsp *pRsp) {
int32_t totalCols = pRsp->numOfTags + pRsp->numOfColumns;
if (totalCols > 0) {
pRsp->pSchemas = taosMemoryMalloc(sizeof(SSchema) * totalCols);
pRsp->pSchemas = taosMemoryCalloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) return -1;
for (int32_t i = 0; i < totalCols; ++i) {
......
......@@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
......
......@@ -835,6 +835,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
pSchema->bytes = pField->bytes;
pSchema->flags = pField->flags;
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
......@@ -848,6 +849,7 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
SSCHMEA_SET_IDX_ON(pSchema);
}
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pSchema->comment, pField->comment, TSDB_COL_COMMENT_LEN);
pSchema->colId = pDst->nextColId;
pDst->nextColId++;
}
......
......@@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->chkInfo.version = ver;
pTask->pMeta = pSnode->pMeta;
streamTaskOpenAllUpstreamInput(pTask);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
......
......@@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
......
......@@ -930,6 +930,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
streamTaskOpenAllUpstreamInput(pTask);
// backup the initial status, and set it to be TASK_STATUS__INIT
pTask->chkInfo.version = ver;
pTask->chkInfo.currentVer = ver;
......@@ -1274,7 +1276,9 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
if (done) {
pTask->tsInfo.step2Start = taosGetTimestampMs();
streamTaskEndScanWAL(pTask);
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
appendTranstateIntoInputQ(pTask);
streamTryExec(pTask); // exec directly
} else {
STimeWindow* pWindow = &pTask->dataRange.window;
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
......@@ -1339,44 +1343,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SStreamTransferReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
return -1;
}
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
pTask->status.transferState = true;
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
......@@ -1564,7 +1530,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
return TSDB_CODE_SUCCESS;
} else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
return TSDB_CODE_INVALID_MSG;
......@@ -1708,6 +1674,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
STQ* pTq = pVnode->pTq;
int32_t vgId = pVnode->config.vgId;
SMsgHead* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
......@@ -1724,7 +1692,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
int32_t taskId = req.taskId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
if (pTask != NULL) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp, false);
......@@ -1741,7 +1711,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
FAIL:
if (pMsg->info.handle == NULL) {
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId);
return -1;
}
......
......@@ -332,8 +332,12 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
void* pBody = POINTER_SHIFT(pReader->pHead->head.body, sizeof(SMsgHead));
int32_t len = pReader->pHead->head.bodyLen - sizeof(SMsgHead);
extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
code = extractDelDataBlock(pBody, len, ver, (SStreamRefDataBlock**)pItem);
if (code != TSDB_CODE_SUCCESS) {
tqError("s-task:%s extract delete msg from WAL failed, code:%s", id, tstrerror(code));
} else {
tqDebug("s-task:%s delete msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver);
}
} else {
ASSERT(0);
}
......
......@@ -210,13 +210,23 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
}
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, set the transfer state flag",
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
pTask->status.transferState = true;
const char* id = pTask->id.idStr;
int64_t maxVer = pTask->dataRange.range.maxVer;
/*int32_t code = */streamSchedExec(pTask);
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
if (!pTask->status.appendTranstateBlock) {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
", not scan wal anymore, add transfer-state block into inputQ",
id, ver, maxVer);
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
appendTranstateIntoInputQ(pTask);
/*int32_t code = */streamSchedExec(pTask);
} else {
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
id, ver, maxVer);
}
}
}
......@@ -262,7 +272,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(status == TASK_STATUS__NORMAL);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
......@@ -277,6 +287,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
continue;
}
// downstream task has blocked the output, stopped for a while
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}
*pScanIdle = false;
// seek the stored version and extract data from WAL
......
......@@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
......
......@@ -392,6 +392,9 @@ static int32_t tsdbSnapReadTombData(STsdbSnapReader* reader, uint8_t** data) {
code = tTombBlockPut(reader->tombBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbIterMergerNext(reader->tombIterMerger);
TSDB_CHECK_CODE(code, lino, _exit);
if (TOMB_BLOCK_SIZE(reader->tombBlock) >= 81920) {
break;
}
......
......@@ -216,7 +216,7 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
cfgRsp.numOfTags = schemaTag.nCols;
cfgRsp.numOfColumns = schema.nCols;
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
cfgRsp.pSchemas = (SSchema *)taosMemoryCalloc(cfgRsp.numOfColumns + cfgRsp.numOfTags, sizeof(SSchema));
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
if (schemaTag.nCols) {
......
......@@ -661,8 +661,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_TRANSFER_STATE:
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
......
......@@ -78,6 +78,10 @@ static int32_t buildDescResultDataBlock(SSDataBlock** pOutput) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_NOTE_LEN, 4);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
infoData = createColumnInfoData(TSDB_DATA_TYPE_VARCHAR, DESCRIBE_RESULT_COL_COMMENT_LEN, 5);
code = blockDataAppendColInfo(pBlock, &infoData);
}
if (TSDB_CODE_SUCCESS == code) {
*pOutput = pBlock;
......@@ -99,7 +103,9 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
SColumnInfoData* pCol3 = taosArrayGet(pBlock->pDataBlock, 2);
// Note
SColumnInfoData* pCol4 = taosArrayGet(pBlock->pDataBlock, 3);
char buf[DESCRIBE_RESULT_FIELD_LEN] = {0};
// Comment
SColumnInfoData* pCol5 = taosArrayGet(pBlock->pDataBlock, 4);
char buf[DESCRIBE_RESULT_COL_COMMENT_LEN + VARSTR_HEADER_SIZE] = {0};
for (int32_t i = 0; i < numOfRows; ++i) {
if (invisibleColumn(sysInfoUser, pMeta->tableType, pMeta->schema[i].flags)) {
continue;
......@@ -112,6 +118,8 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock,
colDataSetVal(pCol3, pBlock->info.rows, (const char*)&bytes, false);
STR_TO_VARSTR(buf, i >= pMeta->tableInfo.numOfColumns ? "TAG" : "");
colDataSetVal(pCol4, pBlock->info.rows, buf, false);
STR_TO_VARSTR(buf, pMeta->schema[i].comment);
colDataSetVal(pCol5, pBlock->info.rows, buf, false);
++(pBlock->info.rows);
}
if (pBlock->info.rows <= 0) {
......@@ -456,14 +464,19 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
SSchema* pSchema = pCfg->pSchemas + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
*len +=
sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
}
}
......@@ -471,14 +484,18 @@ void appendTagFields(char* buf, int32_t* len, STableCfg* pCfg) {
for (int32_t i = 0; i < pCfg->numOfTags; ++i) {
SSchema* pSchema = pCfg->pSchemas + pCfg->numOfColumns + i;
char type[32];
char comments[TSDB_COL_COMMENT_LEN + 16] = {0};
sprintf(type, "%s", tDataTypes[pSchema->type].name);
if (TSDB_DATA_TYPE_VARCHAR == pSchema->type || TSDB_DATA_TYPE_GEOMETRY == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)(pSchema->bytes - VARSTR_HEADER_SIZE));
} else if (TSDB_DATA_TYPE_NCHAR == pSchema->type) {
sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
if (pSchema->comment[0]) {
sprintf(comments, " COMMENT '%s'", pSchema->comment);
}
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s", ((i > 0) ? ", " : ""), pSchema->name, type);
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, "%s`%s` %s%s", ((i > 0) ? ", " : ""), pSchema->name, type, comments);
}
}
......@@ -624,7 +641,7 @@ void appendTableOptions(char* buf, int32_t* len, SDbCfgInfo* pDbCfg, STableCfg*
}
}
if (nSma < pCfg->numOfColumns) {
if (nSma < pCfg->numOfColumns && nSma > 0) {
bool smaOn = false;
*len += sprintf(buf + VARSTR_HEADER_SIZE + *len, " SMA(");
for (int32_t i = 0; i < pCfg->numOfColumns; ++i) {
......
......@@ -183,13 +183,17 @@ void cleanupQueryTableDataCond(SQueryTableDataCond* pCond);
int32_t convertFillType(int32_t mode);
int32_t resultrowComparAsc(const void* p1, const void* p2);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI *pAPI);
void printDataBlock(SSDataBlock* pBlock, const char* flag);
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified, SStorageAPI* pAPI);
char* getStreamOpName(uint16_t opType);
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr);
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols);
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H
......@@ -470,7 +470,6 @@ typedef struct SStreamIntervalOperatorInfo {
SArray* pPullWins; // SPullWindowInfo
int32_t pullIndex;
SSDataBlock* pPullDataRes;
bool isFinal;
SArray* pChildren;
int32_t numOfChild;
SStreamState* pState; // void
......@@ -521,7 +520,6 @@ typedef struct SStreamSessionAggOperatorInfo {
void* pDelIterator;
SArray* pChildren; // cache for children's result; final stream operator
SPhysiNode* pPhyNode; // create new child
bool isFinal;
bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated;
......@@ -709,6 +707,13 @@ uint64_t calcGroupId(char* pData, int32_t len);
void streamOpReleaseState(struct SOperatorInfo* pOperator);
void streamOpReloadState(struct SOperatorInfo* pOperator);
bool inSlidingWindow(SInterval* pInterval, STimeWindow* pWin, SDataBlockInfo* pBlockInfo);
bool inCalSlidingWindow(SInterval* pInterval, STimeWindow* pWin, TSKEY calStart, TSKEY calEnd, EStreamType blockType);
bool compareVal(const char* v, const SStateKeys* pKey);
int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, SDataBlockInfo* pDataBlockInfo,
TSKEY* primaryKeys, int32_t prevPosition, int32_t order);
#ifdef __cplusplus
}
#endif
......
......@@ -58,16 +58,6 @@ static void doKeepTuple(SWindowRowsSup* pRowSup, int64_t ts, uint64_t groupId) {
pRowSup->groupId = groupId;
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* physiNode,
SExecTaskInfo* pTaskInfo) {
SEventWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SEventWindowOperatorInfo));
......@@ -250,7 +240,7 @@ static void doEventWindowAggImpl(SEventWindowOperatorInfo* pInfo, SExprSupp* pSu
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startIndex, numOfRows,
pBlock->info.rows, numOfOutput);
}
......
......@@ -2177,12 +2177,67 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
return TSDB_CODE_SUCCESS;
}
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
char* getStreamOpName(uint16_t opType) {
switch (opType) {
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return "stream scan";
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return "project";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return "interval single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL:
return "interval final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL:
return "interval semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL:
return "stream fill";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION:
return "session single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION:
return "session semi";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION:
return "session final";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE:
return "state single";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION:
return "stream partitionby";
case QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT:
return "stream event";
}
return "";
}
void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===%s: Block is Null or Empty", flag);
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag);
return;
}
if (qDebugFlag & DEBUG_DEBUG) {
char* pBuf = NULL;
char flagBuf[64];
snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr);
qDebug("%s", dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr));
taosMemoryFree(pBuf);
}
}
TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; }
void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
ts[3] = pWin->skey; // window start key
ts[4] = pWin->ekey + delta; // window end key
}
......@@ -1070,3 +1070,15 @@ void streamOpReloadState(SOperatorInfo* pOperator) {
downstream->fpSet.reloadStreamStateFn(downstream);
}
}
bool compareVal(const char* v, const SStateKeys* pKey) {
if (IS_VAR_DATA_TYPE(pKey->type)) {
if (varDataLen(v) != varDataLen(pKey->pData)) {
return false;
} else {
return memcmp(varDataVal(v), varDataVal(pKey->pData), varDataLen(v)) == 0;
}
} else {
return memcmp(pKey->pData, v, pKey->bytes) == 0;
}
}
......@@ -1292,14 +1292,14 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
(pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
}
if (pOperator->status == OP_RES_TO_RETURN) {
doDeleteFillFinalize(pOperator);
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
setOperatorCompleted(pOperator);
......@@ -1317,12 +1317,12 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pOperator->status = OP_RES_TO_RETURN;
pInfo->pFillInfo->preRowKey = INT64_MIN;
if (pInfo->pRes->info.rows > 0) {
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
break;
}
printDataBlock(pBlock, "stream fill recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
......@@ -1339,7 +1339,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
pInfo->pFillSup->hasDelete = true;
doDeleteFillResult(pOperator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "stream fill delete");
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
continue;
......@@ -1378,7 +1378,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
}
pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
printDataBlock(pInfo->pRes, "stream fill");
printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pRes;
}
......
......@@ -956,7 +956,8 @@ static bool hasRemainPartion(SStreamPartitionOperatorInfo* pInfo) { return pInfo
static bool hasRemainTbName(SStreamPartitionOperatorInfo* pInfo) { return pInfo->pTbNameIte != NULL; }
static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamPartitionOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pDest = pInfo->binfo.pRes;
......@@ -994,7 +995,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pOperator->resultInfo.totalRows += pDest->info.rows;
pInfo->parIte = taosHashIterate(pInfo->pPartitions, pInfo->parIte);
ASSERT(pDest->info.rows > 0);
printDataBlock(pDest, "stream partitionby");
printDataBlock(pDest, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pDest;
}
......@@ -1115,7 +1116,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
return NULL;
}
printDataBlock(pBlock, "stream partitionby recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
switch (pBlock->info.type) {
case STREAM_NORMAL:
case STREAM_PULL_DATA:
......@@ -1125,7 +1126,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) {
case STREAM_DELETE_DATA: {
copyDataBlock(pInfo->pDelRes, pBlock);
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pInfo->pDelRes, "stream partitionby delete");
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
} break;
default:
......
......@@ -372,6 +372,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
printDataBlock(p, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
}
return (p->info.rows > 0) ? p : NULL;
}
......
......@@ -1343,7 +1343,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
if (rows == 0) {
return TSDB_CODE_SUCCESS;
}
SExecTaskInfo* pTaskInfo = pInfo->pStreamScanOp->pTaskInfo;
SColumnInfoData* pSrcStartTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, START_TS_COLUMN_INDEX);
SColumnInfoData* pSrcEndTsCol = (SColumnInfoData*)taosArrayGet(pSrcBlock->pDataBlock, END_TS_COLUMN_INDEX);
SColumnInfoData* pSrcUidCol = taosArrayGet(pSrcBlock->pDataBlock, UID_COLUMN_INDEX);
......@@ -1360,7 +1360,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
TSKEY startTs = srcStartTsCol[0];
TSKEY endTs = srcEndTsCol[0];
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, startTs, endTs, ver);
printDataBlock(pPreRes, "pre res");
printDataBlock(pPreRes, "pre res", GET_TASKID(pTaskInfo));
blockDataCleanup(pSrcBlock);
int32_t code = blockDataEnsureCapacity(pSrcBlock, pPreRes->info.rows);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1375,7 +1375,7 @@ static int32_t generateIntervalScanRange(SStreamScanInfo* pInfo, SSDataBlock* pS
appendOneRowToStreamSpecialBlock(pSrcBlock, ((TSKEY*)pTsCol->pData) + i, ((TSKEY*)pTsCol->pData) + i, &srcUid,
&groupId, NULL);
}
printDataBlock(pSrcBlock, "new delete");
printDataBlock(pSrcBlock, "new delete", GET_TASKID(pTaskInfo));
}
uint64_t* srcGp = (uint64_t*)pSrcGpCol->pData;
srcStartTsCol = (TSKEY*)pSrcStartTsCol->pData;
......@@ -1921,38 +1921,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
switch (pInfo->scanMode) {
case STREAM_SCAN_FROM_RES: {
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
printDataBlock(pInfo->pRecoverRes, "scan recover");
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
} break;
// case STREAM_SCAN_FROM_UPDATERES: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// printDataBlock(pInfo->pUpdateRes, "recover update");
// return pInfo->pUpdateRes;
// } break;
// case STREAM_SCAN_FROM_DELETE_DATA: {
// generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
// prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
// pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
// copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
// pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
// printDataBlock(pInfo->pDeleteDataRes, "recover delete");
// return pInfo->pDeleteDataRes;
// } break;
// case STREAM_SCAN_FROM_DATAREADER_RANGE: {
// SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
// if (pSDB) {
// STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
// pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
// checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "scan recover update");
// calBlockTbName(pInfo, pSDB);
// return pSDB;
// }
// blockDataCleanup(pInfo->pUpdateDataRes);
// pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
// } break;
default:
break;
}
......@@ -1961,22 +1932,17 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
if (pInfo->pRecoverRes != NULL) {
calBlockTbName(pInfo, pInfo->pRecoverRes);
if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) {
// if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {
TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
// } else {
// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pStreamInfo->fillHistoryVer.maxVer);
// doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
// }
}
if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
printSpecDataBlock(pInfo->pCreateTbRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %" PRId64, pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
printSpecDataBlock(pInfo->pRecoverRes, getStreamOpName(pOperator->operatorType), "recover", GET_TASKID(pTaskInfo));
return pInfo->pRecoverRes;
}
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__NONE;
......@@ -2032,7 +1998,7 @@ FETCH_NEXT_BLOCK:
pAPI->stateStore.updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
} break;
case STREAM_DELETE_DATA: {
printDataBlock(pBlock, "stream scan delete recv");
printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete recv", GET_TASKID(pTaskInfo));
SSDataBlock* pDelBlock = NULL;
if (pInfo->tqReader) {
pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
......@@ -2043,7 +2009,7 @@ FETCH_NEXT_BLOCK:
setBlockGroupIdByUid(pInfo, pDelBlock);
rebuildDeleteBlockData(pDelBlock, &pStreamInfo->fillHistoryWindow, id);
printDataBlock(pDelBlock, "stream scan delete recv filtered");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete recv filtered", GET_TASKID(pTaskInfo));
if (pDelBlock->info.rows == 0) {
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
......@@ -2054,7 +2020,7 @@ FETCH_NEXT_BLOCK:
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
printDataBlock(pDelBlock, "stream scan delete result");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
blockDataDestroy(pDelBlock);
if (pInfo->pDeleteDataRes->info.rows > 0) {
......@@ -2069,7 +2035,7 @@ FETCH_NEXT_BLOCK:
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pDelBlock, "stream scan delete data");
printSpecDataBlock(pDelBlock, getStreamOpName(pOperator->operatorType), "delete result", GET_TASKID(pTaskInfo));
if (pInfo->tqReader) {
blockDataDestroy(pDelBlock);
}
......@@ -2084,7 +2050,7 @@ FETCH_NEXT_BLOCK:
default:
break;
}
// printDataBlock(pBlock, "stream scan recv");
printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pBlock;
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
......@@ -2120,7 +2086,7 @@ FETCH_NEXT_BLOCK:
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false);
printDataBlock(pSDB, "stream scan update");
printSpecDataBlock(pSDB, getStreamOpName(pOperator->operatorType), "update", GET_TASKID(pTaskInfo));
calBlockTbName(pInfo, pSDB);
return pSDB;
}
......
此差异已折叠。
......@@ -848,6 +848,10 @@ static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC;
if (checkWindowBoundReached(pSliceInfo)) {
return;
}
int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
......
......@@ -127,7 +127,10 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx);
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx* pCtx);
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t getIrateInfoSize();
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);
......
......@@ -1567,6 +1567,45 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS;
}
static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (isPartial) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_NUMERIC_TYPE(colType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = getIrateInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
} else {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (TSDB_DATA_TYPE_BINARY != colType) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateIratePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, true);
}
static int32_t translateIrateMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
return translateIrateImpl(pFunc, pErrBuf, len, false);
}
static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
uint8_t dbPrec = pFunc->node.resType.precision;
......@@ -2604,6 +2643,31 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize,
.pPartialFunc = "_irate_partial",
.pMergeFunc = "_irate_merge"
},
{
.name = "_irate_partial",
.type = FUNCTION_TYPE_IRATE_PARTIAL,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC |
FUNC_MGT_FORBID_SYSTABLE_FUNC,
.translateFunc = translateIratePartial,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunction,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = iratePartialFinalize
},
{
.name = "_irate_merge",
.type = FUNCTION_TYPE_IRATE_MERGE,
.classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateIrateMerge,
.getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup,
.processFunc = irateFunctionMerge,
.sprocessFunc = irateScalarFunction,
.finalizeFunc = irateFinalize
},
{
......
......@@ -5768,6 +5768,8 @@ int32_t derivativeFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t getIrateInfoSize() { return (int32_t)sizeof(SRateInfo); }
bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SRateInfo);
return true;
......@@ -5817,6 +5819,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
if (INT64_MIN == pRateInfo->lastKey) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = tsList[i];
pRateInfo->hasResult = 1;
continue;
}
......@@ -5868,6 +5871,99 @@ static double doCalcRate(const SRateInfo* pRateInfo, double tickPerSec) {
return (duration > 0) ? ((double)diff) / (duration / tickPerSec) : 0.0;
}
static void irateTransferInfoImpl(TSKEY inputKey, SRateInfo* pInput, SRateInfo* pOutput, bool isFirstKey) {
if (inputKey > pOutput->lastKey) {
pOutput->firstKey = pOutput->lastKey;
pOutput->firstValue = pOutput->lastValue;
pOutput->lastKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
pOutput->lastValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
} else if ((inputKey < pOutput->lastKey) && (inputKey > pOutput->firstKey)) {
pOutput->firstKey = isFirstKey ? pInput->firstKey : pInput->lastKey;
pOutput->firstValue = isFirstKey ? pInput->firstValue : pInput->lastValue;
} else {
// inputKey < pOutput->firstKey
}
}
static void irateCopyInfo(SRateInfo* pInput, SRateInfo* pOutput) {
pOutput->firstKey = pInput->firstKey;
pOutput->lastKey = pInput->lastKey;
pOutput->firstValue = pInput->firstValue;
pOutput->lastValue = pInput->lastValue;
}
static int32_t irateTransferInfo(SRateInfo* pInput, SRateInfo* pOutput) {
if ((pInput->firstKey != INT64_MIN && (pInput->firstKey == pOutput->firstKey || pInput->firstKey == pOutput->lastKey)) ||
(pInput->lastKey != INT64_MIN && (pInput->lastKey == pOutput->firstKey || pInput->lastKey == pOutput->lastKey))) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
}
if (pOutput->hasResult == 0) {
irateCopyInfo(pInput, pOutput);
pOutput->hasResult = pInput->hasResult;
return TSDB_CODE_SUCCESS;
}
if (pInput->firstKey != INT64_MIN) {
irateTransferInfoImpl(pInput->firstKey, pInput, pOutput, true);
}
if (pInput->lastKey != INT64_MIN) {
irateTransferInfoImpl(pInput->lastKey, pInput, pOutput, false);
}
pOutput->hasResult = pInput->hasResult;
return TSDB_CODE_SUCCESS;
}
int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
if (pCol->info.type != TSDB_DATA_TYPE_BINARY) {
return TSDB_CODE_FUNC_FUNTION_PARA_TYPE;
}
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
if (pInfo->hasResult) {
GET_RES_INFO(pCtx)->numOfRes = 1;
}
return TSDB_CODE_SUCCESS;
}
int32_t iratePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
int32_t resultBytes = getIrateInfoSize();
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pInfo, resultBytes);
varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataSetVal(pCol, pBlock->info.rows, res, false);
taosMemoryFree(res);
return pResInfo->numOfRes;
}
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......
......@@ -171,8 +171,7 @@ SNode* createDropTableClause(SAstCreateContext* pCxt, bool ignoreNotExists, SNod
SNode* createDropTableStmt(SAstCreateContext* pCxt, SNodeList* pTables);
SNode* createDropSuperTableStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SNode* pRealTable);
SNode* createAlterTableModifyOptions(SAstCreateContext* pCxt, SNode* pRealTable, SNode* pOptions);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName,
SDataType dataType);
SNode* createAlterTableAddModifyCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SNode* pColDefNode);
SNode* createAlterTableDropCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pColName);
SNode* createAlterTableRenameCol(SAstCreateContext* pCxt, SNode* pRealTable, int8_t alterType, SToken* pOldColName,
SToken* pNewColName);
......
......@@ -312,17 +312,17 @@ cmd ::= ALTER STABLE alter_table_clause(A).
alter_table_clause(A) ::= full_table_name(B) alter_table_options(C). { A = createAlterTableModifyOptions(pCxt, B, C); }
alter_table_clause(A) ::=
full_table_name(B) ADD COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, &C, D); }
full_table_name(B) ADD COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_COLUMN, C); }
alter_table_clause(A) ::= full_table_name(B) DROP COLUMN column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_COLUMN, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY COLUMN column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, &C, D); }
full_table_name(B) MODIFY COLUMN column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES, C); }
alter_table_clause(A) ::=
full_table_name(B) RENAME COLUMN column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME, &C, &D); }
alter_table_clause(A) ::=
full_table_name(B) ADD TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, &C, D); }
full_table_name(B) ADD TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_ADD_TAG, C); }
alter_table_clause(A) ::= full_table_name(B) DROP TAG column_name(C). { A = createAlterTableDropCol(pCxt, B, TSDB_ALTER_TABLE_DROP_TAG, &C); }
alter_table_clause(A) ::=
full_table_name(B) MODIFY TAG column_name(C) type_name(D). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, &C, D); }
full_table_name(B) MODIFY TAG column_def(C). { A = createAlterTableAddModifyCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_BYTES, C); }
alter_table_clause(A) ::=
full_table_name(B) RENAME TAG column_name(C) column_name(D). { A = createAlterTableRenameCol(pCxt, B, TSDB_ALTER_TABLE_UPDATE_TAG_NAME, &C, &D); }
alter_table_clause(A) ::=
......@@ -358,7 +358,7 @@ column_def_list(A) ::= column_def(B).
column_def_list(A) ::= column_def_list(B) NK_COMMA column_def(C). { A = addNodeToList(pCxt, B, C); }
column_def(A) ::= column_name(B) type_name(C). { A = createColumnDefNode(pCxt, &B, C, NULL); }
//column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
column_def(A) ::= column_name(B) type_name(C) COMMENT NK_STRING(D). { A = createColumnDefNode(pCxt, &B, C, &D); }
%type type_name { SDataType }
%destructor type_name { }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册