提交 a23899ff 编写于 作者: dengyihao's avatar dengyihao

Merge branch 'enh/rocksRevert' of https://github.com/taosdata/TDengine into enh/rocksRevert

......@@ -459,12 +459,17 @@ TO_JSON(str_literal)
#### TO_UNIXTIMESTAMP
```sql
TO_UNIXTIMESTAMP(expr)
TO_UNIXTIMESTAMP(expr [, return_timestamp])
return_timestamp: {
0
| 1
}
```
**Description**: UNIX timestamp converted from a string of date/time format
**Return value type**: BIGINT
**Return value type**: BIGINT, TIMESTAMP
**Applicable column types**: VARCHAR and NCHAR
......@@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr)
- The input string must be compatible with ISO8601/RFC3339 standard, NULL will be returned if the string can't be converted
- The precision of the returned timestamp is same as the precision set for the current data base in use
- return_timestamp indicates whether the returned value type is TIMESTAMP or not. If this parameter set to 1, function will return TIMESTAMP type. Otherwise function will return BIGINT type. If parameter is omitted, default return value type is BIGINT.
### Time and Date Functions
......
......@@ -69,7 +69,7 @@ Provides information about SQL queries currently running. Similar to SHOW QUERIE
| 1 | consumer_id | BIGINT | Consumer ID |
| 2 | consumer_group | BINARY(192) | Consumer group |
| 3 | client_id | BINARY(192) | Client ID (user-defined) |
| 4 | status | BINARY(20) | Consumer status |
| 4 | status | BINARY(20) | Consumer status. All possible status include: ready(consumer is in normal state), lost(the connection between consumer and mnode is broken), rebalance(the redistribution of vgroups that belongs to current consumer is now in progress), unknown(consumer is in invalid state)
| 5 | topics | BINARY(204) | Subscribed topic. Returns one row for each topic. |
| 6 | up_time | TIMESTAMP | Time of first connection to TDengine Server |
| 7 | subscribe_time | TIMESTAMP | Time of first subscription |
......
......@@ -459,12 +459,17 @@ TO_JSON(str_literal)
#### TO_UNIXTIMESTAMP
```sql
TO_UNIXTIMESTAMP(expr)
TO_UNIXTIMESTAMP(expr [, return_timestamp])
return_timestamp: {
0
| 1
}
```
**功能说明**:将日期时间格式的字符串转换成为 UNIX 时间戳。
**返回结果数据类型**:BIGINT。
**返回结果数据类型**:BIGINT, TIMESTAMP
**应用字段**:VARCHAR, NCHAR。
......@@ -476,6 +481,7 @@ TO_UNIXTIMESTAMP(expr)
- 输入的日期时间字符串须符合 ISO8601/RFC3339 标准,无法转换的字符串格式将返回 NULL。
- 返回的时间戳精度与当前 DATABASE 设置的时间精度一致。
- return_timestamp 指定函数返回值是否为时间戳类型,设置为1时返回 TIMESTAMP 类型,设置为0时返回 BIGINT 类型。如不指定缺省返回 BIGINT 类型。
### 时间和日期函数
......
......@@ -69,7 +69,7 @@ TDengine 3.0 版本开始提供一个内置数据库 `performance_schema`,其
| 1 | consumer_id | BIGINT | 消费者的唯一 ID |
| 2 | consumer_group | BINARY(192) | 消费者组 |
| 3 | client_id | BINARY(192) | 用户自定义字符串,通过创建 consumer 时指定 client_id 来展示 |
| 4 | status | BINARY(20) | 消费者当前状态 |
| 4 | status | BINARY(20) | 消费者当前状态。消费者状态包括:ready(正常可用)、 lost(连接已丢失)、 rebalancing(消费者所属 vgroup 正在分配中)、unknown(未知状态)|
| 5 | topics | BINARY(204) | 被订阅的 topic。若订阅多个 topic,则展示为多行 |
| 6 | up_time | TIMESTAMP | 第一次连接 taosd 的时间 |
| 7 | subscribe_time | TIMESTAMP | 上一次发起订阅的时间 |
......
......@@ -38,3 +38,303 @@ chmod +x TDinsight.sh
运行程序并重启 Grafana 服务,打开面板:`http://localhost:3000/d/tdinsight`
更多使用场景和限制请参考[TDinsight](/reference/tdinsight/) 文档。
## log 库
TDinsight dashboard 数据来源于 log 库(存放监控数据的默认db,可以在 taoskeeper 配置文件中修改,具体参考 [taoskeeper 文档](/reference/taosKeeper))。taoskeeper 启动后会自动创建 log 库,并将监控数据写入到该数据库中。
### cluster\_info 表
`cluster_info` 表记录集群信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|first\_ep|VARCHAR||集群 first ep|
|first\_ep\_dnode\_id|INT||集群 first ep 的 dnode id|
|version|VARCHAR||tdengine version。例如:3.0.4.0|
|master\_uptime|FLOAT||当前 master 节点的uptime。单位:天|
|monitor_interval|INT||monitor interval。单位:秒|
|dbs\_total|INT||database 总数|
|tbs\_total|BIGINT||当前集群 table 总数|
|stbs\_total|INT||当前集群 stable 总数|
|dnodes\_total|INT||当前集群 dnode 总数|
|dnodes\_alive|INT||当前集群 dnode 存活总数|
|mnodes\_total|INT||当前集群 mnode 总数|
|mnodes\_alive|INT||当前集群 mnode 存活总数|
|vgroups\_total|INT||当前集群 vgroup 总数|
|vgroups\_alive|INT||当前集群 vgroup 存活总数|
|vnodes\_total|INT||当前集群 vnode 总数|
|vnodes\_alive|INT||当前集群 vnode 存活总数|
|connections\_total|INT||当前集群连接总数|
|topics\_total|INT||当前集群 topic 总数|
|streams\_total|INT||当前集群 stream 总数|
|protocol|INT||协议版本,目前为 1|
|cluster\_id|NCHAR|TAG|cluster id|
### d\_info 表
`d_info` 表记录 dnode 状态信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|status|VARCHAR||dnode 状态|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### m\_info 表
`m_info` 表记录 mnode 角色信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|role|VARCHAR||mnode 角色, leader 或 follower|
|mnode\_id|INT|TAG|master node id|
|mnode\_ep|NCHAR|TAG|master node endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### dnodes\_info 表
`dnodes_info` 记录 dnode 信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|uptime|FLOAT||dnode uptime|
|cpu\_engine|FLOAT||taosd cpu 使用率,从 `/proc/<taosd_pid>/stat` 读取|
|cpu\_system|FLOAT||服务器 cpu 使用率,从 `/proc/stat` 读取|
|cpu\_cores|FLOAT||服务器 cpu 核数|
|mem\_engine|INT||taosd 内存使用率,从 `/proc/<taosd_pid>/status` 读取|
|mem\_system|INT||服务器内存使用率|
|mem\_total|INT||服务器内存总量,单位 KB|
|disk\_engine|INT|||
|disk\_used|BIGINT||data dir 挂载的磁盘使用量,单位 bytes|
|disk\_total|BIGINT||data dir 挂载的磁盘总容量,单位 bytes|
|net\_in|FLOAT||网络吞吐率,从 `/proc/net/dev` 中读取的 received bytes。单位 bytes per second|
|net\_out|FLOAT||网络吞吐率,从 `/proc/net/dev` 中读取的 transmit bytes。单位 bytes per second|
|io\_read|FLOAT||io 吞吐率,从 `/proc/<taosd_pid>/io` 中读取的 rchar 与上次数值计算之后,计算得到速度。单位 bytes per second|
|io\_write|FLOAT||io 吞吐率,从 `/proc/<taosd_pid>/io` 中读取的 wchar 与上次数值计算之后,计算得到速度。单位 bytes per second|
|io\_read\_disk|FLOAT||磁盘 io 吞吐率,从 `/proc/<taosd_pid>/io` 中读取的 read_bytes。单位 bytes per second|
|io\_write\_disk|FLOAT||磁盘 io 吞吐率,从 `/proc/<taosd_pid>/io` 中读取的 write_bytes。单位 bytes per second|
|req\_select|INT||两个间隔内发生的查询请求数目|
|req\_select\_rate|FLOAT||两个间隔内的查询请求速度 = `req_select / monitorInterval`|
|req\_insert|INT||两个间隔内发生的写入请求,包含的单条数据数目|
|req\_insert\_success|INT||两个间隔内发生的处理成功的写入请求,包含的单条数据数目|
|req\_insert\_rate|FLOAT||两个间隔内的单条数据写入速度 = `req_insert / monitorInterval`|
|req\_insert\_batch|INT||两个间隔内发生的写入请求数目|
|req\_insert\_batch\_success|INT||两个间隔内发生的成功的批量写入请求数目|
|req\_insert\_batch\_rate|FLOAT||两个间隔内的写入请求数目的速度 = `req_insert_batch / monitorInterval`|
|errors|INT||两个间隔内的出错的写入请求的数目|
|vnodes\_num|INT||dnode 上 vnodes 数量|
|masters|INT||dnode 上 master node 数量|
|has\_mnode|INT||dnode 是否包含 mnode|
|has\_qnode|INT||dnode 是否包含 qnode|
|has\_snode|INT||dnode 是否包含 snode|
|has\_bnode|INT||dnode 是否包含 bnode|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### data\_dir 表
`data_dir` 表记录 data 目录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|name|NCHAR||data 目录,一般为 `/var/lib/taos`|
|level|INT||0、1、2 多级存储级别|
|avail|BIGINT||data 目录可用空间|
|used|BIGINT||data 目录已使用空间|
|total|BIGINT||data 目录空间|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_dir 表
`log_dir` 表记录 log 目录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|name|NCHAR||log 目录名,一般为 `/var/log/taos/`|
|avail|BIGINT||log 目录可用空间|
|used|BIGINT||log 目录已使用空间|
|total|BIGINT||log 目录空间|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### temp\_dir 表
`temp_dir` 表记录 temp 目录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|name|NCHAR||temp 目录名,一般为 `/tmp/`|
|avail|BIGINT||temp 目录可用空间|
|used|BIGINT||temp 目录已使用空间|
|total|BIGINT||temp 目录空间|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### vgroups\_info 表
`vgroups_info` 表记录虚拟节点组信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|vgroup\_id|INT||vgroup id|
|database\_name|VARCHAR||vgroup 所属的 database 名字|
|tables\_num|BIGINT||vgroup 中 table 数量|
|status|VARCHAR||vgroup 状态|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### vnodes\_role 表
`vnodes_role` 表记录虚拟节点角色信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|vnode\_role|VARCHAR||vnode 角色,leader 或 follower|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### logs 表
`logs` 表记录登录信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|level|VARCHAR||log level|
|content|NCHAR||log content,长度不超过1024字节|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### log\_summary 表
`log_summary` 记录日志统计信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|error|INT||error 总数|
|info|INT||info 总数|
|debug|INT||debug 总数|
|trace|INT||trace 总数|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### grants\_info 表
`grants_info` 记录授权信息。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|expire\_time|BIGINT||认证过期时间,企业版有效,社区版为 bigint 最大值|
|timeseries\_used|BIGINT||已用测点数|
|timeseries\_total|BIGINT||总测点数,开源版本为 bigint 最大值|
|dnode\_id|INT|TAG|dnode id|
|dnode\_ep|NCHAR|TAG|dnode endpoint|
|cluster\_id|NCHAR|TAG|cluster id|
### keeper\_monitor 表
`keeper_monitor` 记录 taoskeeper 监控数据。
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|ts|TIMESTAMP||timestamp|
|cpu|FLOAT||cpu 使用率|
|mem|FLOAT||内存使用率|
|identify|NCHAR|TAG||
### taosadapter\_restful\_http\_request\_total 表
`taosadapter_restful_http_request_total` 记录 taosadapter rest 请求信息,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|guage|DOUBLE||监控指标值|
|client\_ip|NCHAR|TAG|client ip|
|endpoint|NCHAR|TAG|taosadpater endpoint|
|request\_method|NCHAR|TAG|request method|
|request\_uri|NCHAR|TAG|request uri|
|status\_code|NCHAR|TAG|status code|
### taosadapter\_restful\_http\_request\_fail 表
`taosadapter_restful_http_request_fail` 记录 taosadapter rest 请求失败信息,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|guage|DOUBLE||监控指标值|
|client\_ip|NCHAR|TAG|client ip|
|endpoint|NCHAR|TAG|taosadpater endpoint|
|request\_method|NCHAR|TAG|request method|
|request\_uri|NCHAR|TAG|request uri|
|status\_code|NCHAR|TAG|status code|
### taosadapter\_restful\_http\_request\_in\_flight 表
`taosadapter_restful_http_request_in_flight` 记录 taosadapter rest 实时请求信息,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|guage|DOUBLE||监控指标值|
|endpoint|NCHAR|TAG|taosadpater endpoint|
### taosadapter\_restful\_http\_request\_summary\_milliseconds 表
`taosadapter_restful_http_request_summary_milliseconds` 记录 taosadapter rest 请求汇总信息,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|count|DOUBLE|||
|sum|DOUBLE|||
|0.5|DOUBLE|||
|0.9|DOUBLE|||
|0.99|DOUBLE|||
|0.1|DOUBLE|||
|0.2|DOUBLE|||
|endpoint|NCHAR|TAG|taosadpater endpoint|
|request\_method|NCHAR|TAG|request method|
|request\_uri|NCHAR|TAG|request uri|
### taosadapter\_system\_mem\_percent 表
`taosadapter_system_mem_percent` 表记录 taosadapter 内存使用情况,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|guage|DOUBLE||监控指标值|
|endpoint|NCHAR|TAG|taosadpater endpoint|
### taosadapter\_system\_cpu\_percent 表
`taosadapter_system_cpu_percent` 表记录 taosadapter cpu 使用情况,该表为 schemaless 方式创建的表,时间戳字段名为 `_ts`
|field|type|is\_tag|comment|
|:----|:---|:-----|:------|
|\_ts|TIMESTAMP||timestamp|
|guage|DOUBLE||监控指标值|
|endpoint|NCHAR|TAG|taosadpater endpoint|
......@@ -1259,8 +1259,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse
int code = taos_errno(tres);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres));
jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres));
taos_free_result(tres);
return createSchemalessResp(env, 0, code, taos_errstr(tres));
return jobject;
}
taos_free_result(tres);
......@@ -1286,8 +1287,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse
int code = taos_errno(tres);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres));
jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres));
taos_free_result(tres);
return createSchemalessResp(env, 0, code, taos_errstr(tres));
return jobject;
}
taos_free_result(tres);
......@@ -1315,8 +1317,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse
int code = taos_errno(tres);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres));
jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres));
taos_free_result(tres);
return createSchemalessResp(env, 0, code, taos_errstr(tres));
return jobject;
}
taos_free_result(tres);
......@@ -1343,8 +1346,9 @@ JNIEXPORT jobject JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_schemalessInse
int code = taos_errno(tres);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code: 0x%x, msg:%s", jobj, taos, code, taos_errstr(tres));
jobject jobject = createSchemalessResp(env, 0, code, taos_errstr(tres));
taos_free_result(tres);
return createSchemalessResp(env, 0, code, taos_errstr(tres));
return jobject;
}
taos_free_result(tres);
......
......@@ -362,15 +362,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
p->ts = pCol->ts;
p->colVal = pCol->colVal;
singleTableLastTs = pCol->ts;
// only set value for last row query
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST_ROW)) {
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
}
}
} else {
SLastCol* p = taosArrayGet(pLastCols, slotId);
......@@ -417,6 +408,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
}
if (taosArrayGetSize(pTableUidList) == 0) {
taosArrayPush(pTableUidList, &pKeyInfo->uid);
} else {
taosArraySet(pTableUidList, 0, &pKeyInfo->uid);
}
tsdbCacheRelease(lruCache, h);
}
......
......@@ -1259,6 +1259,11 @@ void tBlockDataReset(SBlockData *pBlockData) {
pBlockData->suid = 0;
pBlockData->uid = 0;
pBlockData->nRow = 0;
for (int32_t i = 0; i < pBlockData->nColData; i++) {
tColDataDestroy(&pBlockData->aColData[i]);
}
pBlockData->nColData = 0;
taosMemoryFreeClear(pBlockData->aColData);
}
void tBlockDataClear(SBlockData *pBlockData) {
......
......@@ -45,13 +45,13 @@ static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t)? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SCacheRowsScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SCacheRowsScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
tableListDestroy(pTableListInfo);
......@@ -91,7 +91,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo);
code = tsdbCacherowsReaderOpen(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader, pTaskInfo->id.str);
taosArrayGetSize(pInfo->matchInfo.pList), suid, &pInfo->pLastrowReader,
pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -114,7 +115,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
}
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo);
setOperatorInfo(pOperator, "CachedRowScanOperator", QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet =
......@@ -123,7 +125,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
pOperator->cost.openCost = 0;
return pOperator;
_error:
_error:
pTaskInfo->code = code;
destroyCacheScanOperator(pInfo);
taosMemoryFree(pOperator);
......@@ -136,8 +138,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
SCacheRowsScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
STableListInfo* pTableList = pInfo->pTableList;
uint64_t suid = tableListGetSuid(pTableList);
int32_t size = tableListGetSize(pTableList);
......@@ -194,8 +196,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes->info.rows = 1;
SExprSupp* pSup = &pInfo->pseudoExprSup;
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pRes,
pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......@@ -217,7 +219,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
}
STableKeyInfo* pList = NULL;
int32_t num = 0;
int32_t num = 0;
int32_t code = tableListGetGroupList(pTableList, pInfo->currentGroupIndex, &pList, &num);
if (code != TSDB_CODE_SUCCESS) {
......@@ -251,11 +253,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pInfo->pRes->info.id.groupId = pKeyInfo->groupId;
if (taosArrayGetSize(pInfo->pUidList) > 0) {
ASSERT((pInfo->retrieveType & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
pInfo->pRes->info.id.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, 0);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, pInfo->pRes->info.rows,
GET_TASKID(pTaskInfo), NULL);
code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
pInfo->pRes->info.rows, GET_TASKID(pTaskInfo), NULL);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
......@@ -325,7 +325,7 @@ int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pC
return TSDB_CODE_SUCCESS;
}
size_t size = taosArrayGetSize(pColMatchInfo->pList);
size_t size = taosArrayGetSize(pColMatchInfo->pList);
SArray* pMatchInfo = taosArrayInit(size, sizeof(SColMatchItem));
for (int32_t i = 0; i < size; ++i) {
......
......@@ -1455,18 +1455,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) {
return w;
}
static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
NULL, pOperator);
}
}
static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWins, int32_t* index,
SSDataBlock* pBlock) {
blockDataCleanup(pBlock);
......@@ -1534,6 +1522,10 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
cleanupBasicInfo(&pInfo->binfo);
cleanupAggSup(&pInfo->aggSup);
// it should be empty.
void* pIte = NULL;
while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) {
taosArrayDestroy(*(void**)pIte);
}
taosHashCleanup(pInfo->pPullDataMap);
taosArrayDestroy(pInfo->pPullWins);
blockDataDestroy(pInfo->pPullDataRes);
......@@ -4426,23 +4418,6 @@ void destroyMergeIntervalOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, STimeWindow* win,
SSDataBlock* pResultBlock) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC);
SExprSupp* pExprSup = &pOperatorInfo->exprSupp;
SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId);
SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet(
iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
ASSERT(p1 != NULL);
// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo);
tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE));
return TSDB_CODE_SUCCESS;
}
static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock,
STimeWindow* newWin) {
SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info;
......@@ -4463,7 +4438,6 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow* prevWin = &prevGrpWin->window;
if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) {
// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock);
tdListPopNode(miaInfo->groupIntervals, listNode);
}
}
......@@ -4623,7 +4597,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if (listNode != NULL) {
SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data);
// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes);
pRes->info.id.groupId = grpWin->groupId;
}
}
......@@ -4768,6 +4741,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===single interval recv|block type STREAM_GET_ALL");
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
......
......@@ -1933,14 +1933,35 @@ static int32_t translateToIso8601(SFunctionNode* pFunc, char* pErrBuf, int32_t l
}
static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
if (1 != LIST_LENGTH(pFunc->pParameterList)) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
int16_t resType = TSDB_DATA_TYPE_BIGINT;
if (1 != numOfParams && 2 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_STR_DATA_TYPE(((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type)) {
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
if (!IS_STR_DATA_TYPE(para1Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (2 == numOfParams) {
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
if (!IS_INTEGER_TYPE(para2Type)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
if (pValue->datum.i == 1) {
resType = TSDB_DATA_TYPE_TIMESTAMP;
} else if (pValue->datum.i == 0) {
resType = TSDB_DATA_TYPE_BIGINT;
} else {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"TO_UNIXTIMESTAMP function second parameter should be 0/1");
}
}
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
......@@ -1948,7 +1969,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
return code;
}
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT};
pFunc->node.resType = (SDataType){.bytes = tDataTypes[resType].bytes, .type = resType};
return TSDB_CODE_SUCCESS;
}
......
......@@ -343,7 +343,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN + 1];
UdfcFuncHandle handle;
int32_t refCount;
int64_t lastRefTime;
int64_t createTime;
} SUdfcFuncStub;
typedef struct SUdfcProxy {
......@@ -363,6 +363,7 @@ typedef struct SUdfcProxy {
uv_mutex_t udfStubsMutex;
SArray *udfStubs; // SUdfcFuncStub
SArray *expiredUdfStubs; //SUdfcFuncStub
uv_mutex_t udfcUvMutex;
int8_t initialized;
......@@ -959,7 +960,7 @@ int32_t udfcOpen();
int32_t udfcClose();
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle);
void releaseUdfFuncHandle(char *udfName);
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle);
int32_t cleanUpUdfs();
bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv);
......@@ -967,6 +968,8 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pRes
int32_t udfAggProcess(struct SqlFunctionCtx *pCtx);
int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
void cleanupNotExpiredUdfs();
void cleanupExpiredUdfs();
int compareUdfcFuncSub(const void *elem1, const void *elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
......@@ -982,16 +985,24 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
int64_t currUs = taosGetTimestampUs();
bool expired = (currUs - foundStub->createTime) >= 10 * 1000 * 1000;
if (!expired) {
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
} else {
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache", udfName,
foundStub->refCount, foundStub->createTime);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
}
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName,
foundStub->refCount, foundStub->lastRefTime);
fnInfo("udf handle expired for %s, will setup udf. move it to expired list", udfName);
taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
taosArrayPush(gUdfcProxy.expiredUdfStubs, foundStub);
taosArraySort(gUdfcProxy.expiredUdfStubs, compareUdfcFuncSub);
}
}
*pHandle = NULL;
......@@ -1001,7 +1012,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
strncpy(stub.udfName, udfName, TSDB_FUNC_NAME_LEN);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
stub.createTime = taosGetTimestampUs();
taosArrayPush(gUdfcProxy.udfStubs, &stub);
taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
} else {
......@@ -1012,32 +1023,51 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
return code;
}
void releaseUdfFuncHandle(char *udfName) {
void releaseUdfFuncHandle(char *udfName, UdfcFuncHandle handle) {
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) {
SUdfcFuncStub *expiredStub = taosArraySearch(gUdfcProxy.expiredUdfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub && !expiredStub) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return;
}
if (foundStub->refCount > 0) {
if (foundStub != NULL && foundStub->handle == handle && foundStub->refCount > 0) {
--foundStub->refCount;
}
if (expiredStub != NULL && expiredStub->handle == handle && expiredStub->refCount > 0) {
--expiredStub->refCount;
}
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
}
int32_t cleanUpUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) {
return TSDB_CODE_SUCCESS;
void cleanupExpiredUdfs() {
int32_t i = 0;
SArray *expiredUdfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfcProxy.expiredUdfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.expiredUdfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. expired. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. expired. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(expiredUdfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, expired. refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime);
}
}
++i;
}
taosArrayDestroy(gUdfcProxy.expiredUdfStubs);
gUdfcProxy.expiredUdfStubs = expiredUdfStubs;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
void cleanupNotExpiredUdfs() {
SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0;
while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
......@@ -1046,20 +1076,38 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->lastRefTime, stub->handle);
fnInfo("udf still in use. udf name: %s, ref count: %d, create time: %" PRId64 ", handle: %p", stub->udfName,
stub->refCount, stub->createTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
fnInfo("udf invalid handle for %s, refCount: %d, create time: %" PRId64 ". remove it from cache",
stub->udfName, stub->refCount, stub->createTime);
}
}
++i;
}
taosArrayDestroy(gUdfcProxy.udfStubs);
gUdfcProxy.udfStubs = udfStubs;
}
int32_t cleanUpUdfs() {
int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) {
return TSDB_CODE_SUCCESS;
}
uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if ((gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) &&
(gUdfcProxy.expiredUdfStubs == NULL || taosArrayGetSize(gUdfcProxy.expiredUdfStubs) == 0)) {
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS;
}
cleanupNotExpiredUdfs();
cleanupExpiredUdfs();
uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0;
}
......@@ -1075,7 +1123,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (code != TSDB_CODE_SUCCESS) {
fnError("udfc scalar function execution failure");
releaseUdfFuncHandle(udfName);
releaseUdfFuncHandle(udfName, handle);
return code;
}
......@@ -1089,7 +1137,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
}
releaseUdfFuncHandle(udfName);
releaseUdfFuncHandle(udfName, handle);
return code;
}
......@@ -1122,7 +1170,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return false;
}
if (buf.bufLen <= session->bufSize) {
......@@ -1131,10 +1179,10 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResult
udfRes->interResNum = buf.numOfResult;
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return false;
}
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&buf);
return true;
}
......@@ -1191,7 +1239,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
taosArrayDestroy(pTempBlock->pDataBlock);
taosMemoryFree(pTempBlock);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
freeUdfInterBuf(&newState);
return udfCode;
}
......@@ -1236,7 +1284,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) {
freeUdfInterBuf(&resultBuf);
int32_t numOfResults = functionFinalizeWithResultBuf(pCtx, pBlock, udfRes->finalResBuf);
releaseUdfFuncHandle(pCtx->udfName);
releaseUdfFuncHandle(pCtx->udfName, handle);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
......@@ -1663,6 +1711,7 @@ int32_t udfcOpen() {
uv_barrier_wait(&proxy->initBarrier);
uv_mutex_init(&proxy->udfStubsMutex);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
proxy->expiredUdfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
uv_mutex_init(&proxy->udfcUvMutex);
fnInfo("udfc initialized") return 0;
}
......@@ -1679,6 +1728,7 @@ int32_t udfcClose() {
uv_thread_join(&udfc->loopThread);
uv_mutex_destroy(&udfc->taskQueueMutex);
uv_barrier_destroy(&udfc->initBarrier);
taosArrayDestroy(udfc->expiredUdfStubs);
taosArrayDestroy(udfc->udfStubs);
uv_mutex_destroy(&udfc->udfStubsMutex);
uv_mutex_destroy(&udfc->udfcUvMutex);
......
......@@ -591,7 +591,7 @@ SUdf *udfdNewUdf(const char *udfName) {
SUdf *udfdGetOrCreateUdf(const char *udfName) {
uv_mutex_lock(&global.udfsMutex);
SUdf **pUdfHash = taosHashGet(global.udfsHash, udfName, strlen(udfName));
int64_t currTime = taosGetTimestampSec();
int64_t currTime = taosGetTimestampMs();
bool expired = false;
if (pUdfHash) {
expired = currTime - (*pUdfHash)->lastFetchTime > 10 * 1000; // 10s
......@@ -688,6 +688,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
output.colMeta.type = udf->outputType;
output.colMeta.precision = 0;
output.colMeta.scale = 0;
udfColEnsureCapacity(&output, call->block.info.rows);
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
code = udf->scriptPlugin->udfScalarProcFunc(&input, &output, udf->scriptUdfCtx);
......
......@@ -136,6 +136,7 @@ static bool logicConditionNodeEqual(const SLogicConditionNode* a, const SLogicCo
static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) {
COMPARE_SCALAR_FIELD(funcId);
COMPARE_STRING_FIELD(functionName);
COMPARE_NODE_LIST_FIELD(pParameterList);
return true;
}
......
......@@ -1047,8 +1047,8 @@ sliding_opt(A) ::= SLIDING NK_LP duration_literal(B) NK_RP.
fill_opt(A) ::= . { A = NULL; }
fill_opt(A) ::= FILL NK_LP fill_mode(B) NK_RP. { A = createFillNode(pCxt, B, NULL); }
fill_opt(A) ::= FILL NK_LP VALUE NK_COMMA literal_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); }
fill_opt(A) ::= FILL NK_LP VALUE_F NK_COMMA literal_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); }
fill_opt(A) ::= FILL NK_LP VALUE NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE, createNodeListNode(pCxt, B)); }
fill_opt(A) ::= FILL NK_LP VALUE_F NK_COMMA expression_list(B) NK_RP. { A = createFillNode(pCxt, FILL_MODE_VALUE_F, createNodeListNode(pCxt, B)); }
%type fill_mode { EFillMode }
%destructor fill_mode { }
......
......@@ -1310,7 +1310,8 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
static EDealRes haveVectorFunction(SNode* pNode, void* pContext) {
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) || isInterpPseudoColumnFunc(pNode)) {
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) ||
isInterpPseudoColumnFunc(pNode)) {
*((bool*)pContext) = true;
return DEAL_RES_END;
}
......@@ -2930,6 +2931,11 @@ static int32_t convertFillValue(STranslateContext* pCxt, SDataType dt, SNodeList
if (TSDB_CODE_SUCCESS == code) {
code = scalarCalculateConstants(pCaseFunc, &pCell->pNode);
}
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE != nodeType(pCell->pNode)) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Fill value is just a constant");
} else if (TSDB_CODE_SUCCESS != code) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled data type mismatch");
}
return code;
}
......@@ -2946,9 +2952,9 @@ static int32_t checkFillValues(STranslateContext* pCxt, SFillNode* pFill, SNodeL
if (fillNo >= LIST_LENGTH(pFillValues->pNodeList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled values number mismatch");
}
if (TSDB_CODE_SUCCESS !=
convertFillValue(pCxt, ((SExprNode*)pProject)->resType, pFillValues->pNodeList, fillNo)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_WRONG_VALUE_TYPE, "Filled data type mismatch");
int32_t code = convertFillValue(pCxt, ((SExprNode*)pProject)->resType, pFillValues->pNodeList, fillNo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
++fillNo;
}
......@@ -5744,6 +5750,14 @@ static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pSt
static int32_t translateAlterLocal(STranslateContext* pCxt, SAlterLocalStmt* pStmt) {
// The statement is executed directly on the client without constructing a message.
if ('\0' != pStmt->value[0]) {
return TSDB_CODE_SUCCESS;
}
char* p = strchr(pStmt->config, ' ');
if (NULL != p) {
*p = 0;
strcpy(pStmt->value, p + 1);
}
return TSDB_CODE_SUCCESS;
}
......
此差异已折叠。
......@@ -1095,7 +1095,7 @@ static int32_t sortPriKeyOptGetSequencingNodesImpl(SLogicNode* pNode, bool group
*pNotOptimize = false;
return TSDB_CODE_SUCCESS;
}
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN: {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
......@@ -2139,7 +2139,7 @@ typedef struct SLastRowScanOptLastParaCkCxt {
bool hasCol;
} SLastRowScanOptLastParaCkCxt;
static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
static EDealRes lastRowScanOptLastParaIsTagImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
SLastRowScanOptLastParaCkCxt* pCxt = pContext;
if (COLUMN_TYPE_TAG == ((SColumnNode*)pNode)->colType || COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
......@@ -2152,10 +2152,10 @@ static EDealRes lastRowScanOptLastParaCheckImpl(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static bool lastRowScanOptLastParaCheck(SNode* pExpr) {
static bool lastRowScanOptLastParaIsTag(SNode* pExpr) {
SLastRowScanOptLastParaCkCxt cxt = {.hasTag = false, .hasCol = false};
nodesWalkExpr(pExpr, lastRowScanOptLastParaCheckImpl, &cxt);
return !cxt.hasTag && cxt.hasCol;
nodesWalkExpr(pExpr, lastRowScanOptLastParaIsTagImpl, &cxt);
return cxt.hasTag && !cxt.hasCol;
}
static bool hasSuitableCache(int8_t cacheLastMode, bool hasLastRow, bool hasLast) {
......@@ -2195,15 +2195,19 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
SFunctionNode* pAggFunc = (SFunctionNode*)pFunc;
if (FUNCTION_TYPE_LAST == pAggFunc->funcType) {
if (hasSelectFunc || !lastRowScanOptLastParaCheck(nodesListGetNode(pAggFunc->pParameterList, 0))) {
if (hasSelectFunc || QUERY_NODE_VALUE == nodeType(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
hasLastFunc = true;
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
} else if (FUNCTION_TYPE_SELECT_VALUE == pAggFunc->funcType) {
if (hasLastFunc) {
return false;
}
hasSelectFunc = true;
} else if (FUNCTION_TYPE_GROUP_KEY == pAggFunc->funcType) {
if (!lastRowScanOptLastParaIsTag(nodesListGetNode(pAggFunc->pParameterList, 0))) {
return false;
}
} else if (FUNCTION_TYPE_LAST_ROW != pAggFunc->funcType) {
return false;
}
......@@ -2237,7 +2241,7 @@ static EDealRes lastRowScanOptSetColDataType(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE;
}
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols) {
static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCols, bool erase) {
SNode* pTarget = NULL;
WHERE_EACH(pTarget, pTargets) {
bool found = false;
......@@ -2249,7 +2253,7 @@ static void lastRowScanOptSetLastTargets(SNodeList* pTargets, SNodeList* pLastCo
break;
}
}
if (!found) {
if (!found && erase) {
ERASE_NODE(pTargets);
continue;
}
......@@ -2290,9 +2294,8 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
pScan->igLastNull = pAgg->hasLast ? true : false;
if (NULL != cxt.pLastCols) {
cxt.doAgg = false;
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols);
NODES_DESTORY_LIST(pScan->pScanPseudoCols);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols);
lastRowScanOptSetLastTargets(pScan->pScanCols, cxt.pLastCols, true);
lastRowScanOptSetLastTargets(pScan->node.pTargets, cxt.pLastCols, false);
nodesClearList(cxt.pLastCols);
}
pAgg->hasLastRow = false;
......
......@@ -1124,7 +1124,8 @@ _end:
int32_t toUnixtimestampFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
int32_t type = GET_PARAM_TYPE(pInput);
int64_t timePrec;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData);
int32_t idx = (inputNum == 2) ? 1 : 2;
GET_TYPED_DATA(timePrec, int64_t, GET_PARAM_TYPE(&pInput[idx]), pInput[idx].columnData->pData);
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) {
......
......@@ -187,7 +187,7 @@ static int32_t cfgSetInt32(SConfigItem *pItem, const char *value, ECfgSrcType st
}
static int32_t cfgSetInt64(SConfigItem *pItem, const char *value, ECfgSrcType stype) {
int64_t ival = (int64_t)atoi(value);
int64_t ival = (int64_t)atoll(value);
if (ival < pItem->imin || ival > pItem->imax) {
uError("cfg:%s, type:%s src:%s value:%" PRId64 " out of range[%" PRId64 ", %" PRId64 "]", pItem->name,
cfgDtypeStr(pItem->dtype), cfgStypeStr(stype), ival, pItem->imin, pItem->imax);
......
......@@ -4,6 +4,9 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
return 1
print =============== create database with retentions
sql create database d0 retentions 5s:7d,10s:21d,15s:365d;
sql use d0
......
......@@ -4,6 +4,9 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
return 1
print =============== create database with retentions
sql create database d0 retentions 5s:7d,5m:21d,15m:365d;
sql use d0
......
......@@ -62,7 +62,7 @@ $loop_count = 0
loop0:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -93,7 +93,7 @@ $loop_count = 0
loop01:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -132,7 +132,7 @@ $loop_count = 0
loop011:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -171,7 +171,7 @@ $loop_count = 0
loop02:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -211,7 +211,7 @@ $loop_count = 0
loop03:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -258,7 +258,7 @@ $loop_count = 0
loop04:
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -300,7 +300,7 @@ sleep 2000
sql select * from streamtST1;
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -429,7 +429,7 @@ sql select * from streamtST1;
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......@@ -468,7 +468,7 @@ sql select * from streamtST3;
sleep 2000
$loop_count = $loop_count + 1
if $loop_count == 10 then
if $loop_count == 30 then
return -1
endi
......
......@@ -26,575 +26,139 @@ sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sleep 5000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 2 then
print ======$data13
return -1
endi
if $data14 != 2 then
print ======$data14
return -1
endi
if $data15 != 3 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
sleep 1000
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
return -1
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
endi
if $data02 != 2 then
print ======$data02
return -1
endi
if $data03 != 5 then
print ======$data03
return -1
endi
if $data04 != 2 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
endi
if $data12 != 1 then
print ======$data12
return -1
endi
if $data13 != 12 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
endi
if $data22 != 1 then
print ======$data22
return -1
endi
if $data23 != 3 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
endi
if $data32 != 1 then
print ======$data32
return -1
endi
if $data33 != 4 then
print ======$data33
return -1
endi
if $data34 != 2 then
print ======$data34
return -1
endi
if $data35 != 3 then
print ======$data35
return -1
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
endi
if $data12 != 2 then
print ======$data12
return -1
endi
if $data13 != 24 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 36 then
print ======$data13
return -1
endi
if $data14 != 14 then
print ======$data14
return -1
endi
if $data15 != 13 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
endi
if $data12 != 3 then
print ======$data12
return -1
endi
if $data13 != 6 then
print ======$data13
return -1
endi
if $data14 != 3 then
print ======$data14
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
return -1
endi
if $data22 != 2 then
print ======$data22
return -1
endi
if $data23 != 6 then
print ======$data23
return -1
endi
if $data24 != 2 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
return -1
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 4 then
print ======$data02
return -1
endi
if $data03 != 50 then
print ======$data03 != 50
return -1
endi
if $data04 != 20 then
print ======$data04 != 20
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
$loop_count = 0
loop00:
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 1000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
return -1
endi
if $data12 != 4 then
print ======$data12
return -1
endi
if $data13 != 46 then
print ======$data13 != 46
return -1
endi
if $data14 != 20 then
print ======$data14 != 20
return -1
endi
if $data15 != 1 then
print ======$data15
return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
endi
if $data22 != 4 then
print ======$data22
return -1
endi
if $data23 != 15 then
print ======$data23
return -1
endi
if $data24 != 4 then
print ======$data24
return -1
endi
if $data25 != 3 then
print ======$data25
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql create database test2 vgroups 1;
sql select * from information_schema.ins_databases;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
sql create stream stream2 trigger at_once fill_history 1 IGNORE EXPIRED 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
sleep 5000
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
return -1
goto loop00
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
goto loop00
endi
if $data02 != 2 then
print ======$data02
return -1
goto loop00
endi
if $data03 != 5 then
print ======$data03
return -1
goto loop00
endi
if $data04 != 2 then
print ======$data04
return -1
goto loop00
endi
if $data05 != 3 then
print ======$data05
return -1
goto loop00
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
goto loop00
endi
if $data12 != 1 then
print ======$data12
return -1
goto loop00
endi
if $data13 != 2 then
print ======$data13
return -1
goto loop00
endi
if $data14 != 2 then
print ======$data14
return -1
goto loop00
endi
if $data15 != 3 then
print ======$data15
return -1
goto loop00
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
goto loop00
endi
if $data22 != 1 then
print ======$data22
return -1
goto loop00
endi
if $data23 != 3 then
print ======$data23
return -1
goto loop00
endi
if $data24 != 2 then
print ======$data24
return -1
goto loop00
endi
if $data25 != 3 then
print ======$data25
return -1
goto loop00
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
goto loop00
endi
if $data32 != 1 then
print ======$data32
return -1
goto loop00
endi
if $data33 != 4 then
print ======$data33
return -1
goto loop00
endi
if $data34 != 2 then
print ======$data34
return -1
goto loop00
endi
if $data35 != 3 then
print ======$data35
return -1
goto loop00
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
$loop_count = 0
loop01:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
......@@ -603,321 +167,382 @@ print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
return -1
goto loop01
endi
# row 0
if $data01 != 2 then
print ======$data01
return -1
goto loop01
endi
if $data02 != 2 then
print ======$data02
return -1
goto loop01
endi
if $data03 != 5 then
print ======$data03
return -1
goto loop01
endi
if $data04 != 2 then
print ======$data04
return -1
goto loop01
endi
if $data05 != 3 then
print ======$data05
return -1
goto loop01
endi
# row 1
if $data11 != 1 then
print ======$data11
return -1
goto loop01
endi
if $data12 != 1 then
print ======$data12
return -1
goto loop01
endi
if $data13 != 12 then
print ======$data13
return -1
goto loop01
endi
if $data14 != 14 then
print ======$data14
return -1
goto loop01
endi
if $data15 != 13 then
print ======$data15
return -1
goto loop01
endi
# row 2
if $data21 != 1 then
print ======$data21
return -1
goto loop01
endi
if $data22 != 1 then
print ======$data22
return -1
goto loop01
endi
if $data23 != 3 then
print ======$data23
return -1
goto loop01
endi
if $data24 != 2 then
print ======$data24
return -1
goto loop01
endi
if $data25 != 3 then
print ======$data25
return -1
goto loop01
endi
# row 3
if $data31 != 1 then
print ======$data31
return -1
goto loop01
endi
if $data32 != 1 then
print ======$data32
return -1
goto loop01
endi
if $data33 != 4 then
print ======$data33
return -1
goto loop01
endi
if $data34 != 2 then
print ======$data34
return -1
goto loop01
endi
if $data35 != 3 then
print ======$data35
return -1
goto loop01
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
$loop_count = 0
loop02:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
return -1
goto loop02
endi
if $data12 != 2 then
print ======$data12
return -1
goto loop02
endi
if $data13 != 24 then
print ======$data13
return -1
goto loop02
endi
if $data14 != 14 then
print ======$data14
return -1
goto loop02
endi
if $data15 != 13 then
print ======$data15
return -1
goto loop02
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
$loop_count = 0
loop03:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
goto loop03
endi
if $data12 != 3 then
print ======$data12
return -1
goto loop03
endi
if $data13 != 36 then
print ======$data13
return -1
goto loop03
endi
if $data14 != 14 then
print ======$data14
return -1
goto loop03
endi
if $data15 != 13 then
print ======$data15
return -1
goto loop03
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
$loop_count = 0
loop04:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
return -1
goto loop04
endi
if $data12 != 3 then
print ======$data12
return -1
goto loop04
endi
if $data13 != 6 then
print ======$data13
return -1
goto loop04
endi
if $data14 != 3 then
print ======$data14
return -1
goto loop04
endi
if $data15 != 1 then
print ======$data15
return -1
goto loop04
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
$loop_count = 0
loop1:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
return -1
goto loop1
endi
if $data22 != 2 then
print ======$data22
return -1
goto loop1
endi
if $data23 != 6 then
print ======$data23
return -1
goto loop1
endi
if $data24 != 2 then
print ======$data24
return -1
goto loop1
endi
if $data25 != 3 then
print ======$data25
return -1
goto loop1
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
$loop_count = 0
loop05:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
goto loop05
endi
if $data02 != 4 then
print ======$data02
return -1
goto loop05
endi
if $data03 != 50 then
print ======$data03 != 50
return -1
goto loop05
endi
if $data04 != 20 then
print ======$data04 != 20
return -1
goto loop05
endi
if $data05 != 3 then
print ======$data05
return -1
goto loop05
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
$loop_count = 0
loop06:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
return -1
goto loop06
endi
if $data12 != 4 then
print ======$data12
return -1
goto loop06
endi
if $data13 != 46 then
print ======$data13 != 46
return -1
goto loop06
endi
if $data14 != 20 then
print ======$data14 != 20
return -1
goto loop06
endi
if $data15 != 1 then
print ======$data15
return -1
goto loop06
endi
# row 2
if $data21 != 4 then
print ======$data21
return -1
goto loop06
endi
if $data22 != 4 then
print ======$data22
return -1
goto loop06
endi
if $data23 != 15 then
print ======$data23
return -1
goto loop06
endi
if $data24 != 4 then
print ======$data24
return -1
goto loop06
endi
if $data25 != 3 then
print ======$data25
return -1
goto loop06
endi
print =====over
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print =============== create database
sql create database test vgroups 1;
sql select * from information_schema.ins_databases
if $rows != 3 then
return -1
endi
print $data00 $data01 $data02
sql use test;
sql create database test2 vgroups 1;
sql select * from information_schema.ins_databases;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
sql insert into t1 values(1648791243003,4,2,3,3.1);
sql insert into t1 values(1648791213004,4,2,3,4.1);
print create stream stream2 trigger at_once fill_history 1 IGNORE EXPIRED 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s)
sql create stream stream2 trigger at_once fill_history 1 IGNORE EXPIRED 0 into streamt as select _wstart, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s);
$loop_count = 0
loop0:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
if $rows != 4 then
print ======$rows
goto loop0
endi
# row 0
if $data01 != 2 then
print ======$data01
goto loop0
endi
if $data02 != 2 then
print ======$data02
goto loop0
endi
if $data03 != 5 then
print ======$data03
goto loop0
endi
if $data04 != 2 then
print ======$data04
goto loop0
endi
if $data05 != 3 then
print ======$data05
goto loop0
endi
# row 1
if $data11 != 1 then
print ======$data11
goto loop0
endi
if $data12 != 1 then
print ======$data12
goto loop0
endi
if $data13 != 2 then
print ======$data13
goto loop0
endi
if $data14 != 2 then
print ======$data14
goto loop0
endi
if $data15 != 3 then
print ======$data15
goto loop0
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop0
endi
if $data22 != 1 then
print ======$data22
goto loop0
endi
if $data23 != 3 then
print ======$data23
goto loop0
endi
if $data24 != 2 then
print ======$data24
goto loop0
endi
if $data25 != 3 then
print ======$data25
goto loop0
endi
# row 3
if $data31 != 1 then
print ======$data31
goto loop0
endi
if $data32 != 1 then
print ======$data32
goto loop0
endi
if $data33 != 4 then
print ======$data33
goto loop0
endi
if $data34 != 2 then
print ======$data34
goto loop0
endi
if $data35 != 3 then
print ======$data35
goto loop0
endi
sql insert into t1 values(1648791223001,12,14,13,11.1);
$loop_count = 0
loop07:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select * from streamt;
print count(*) , count(d) , sum(a) , max(b) , min(c)
print 0: $data00 , $data01 , $data02 , $data03 , $data04 , $data05
print 1: $data10 , $data11 , $data12 , $data13 , $data14 , $data15
if $rows != 4 then
print ======$rows
goto loop07
endi
# row 0
if $data01 != 2 then
print ======$data01
goto loop07
endi
if $data02 != 2 then
print ======$data02
goto loop07
endi
if $data03 != 5 then
print ======$data03
goto loop07
endi
if $data04 != 2 then
print ======$data04
goto loop07
endi
if $data05 != 3 then
print ======$data05
goto loop07
endi
# row 1
if $data11 != 1 then
print ======$data11
goto loop07
endi
if $data12 != 1 then
print ======$data12
goto loop07
endi
if $data13 != 12 then
print ======$data13
goto loop07
endi
if $data14 != 14 then
print ======$data14
goto loop07
endi
if $data15 != 13 then
print ======$data15
goto loop07
endi
# row 2
if $data21 != 1 then
print ======$data21
goto loop07
endi
if $data22 != 1 then
print ======$data22
goto loop07
endi
if $data23 != 3 then
print ======$data23
goto loop07
endi
if $data24 != 2 then
print ======$data24
goto loop07
endi
if $data25 != 3 then
print ======$data25
goto loop07
endi
# row 3
if $data31 != 1 then
print ======$data31
goto loop07
endi
if $data32 != 1 then
print ======$data32
goto loop07
endi
if $data33 != 4 then
print ======$data33
goto loop07
endi
if $data34 != 2 then
print ======$data34
goto loop07
endi
if $data35 != 3 then
print ======$data35
goto loop07
endi
sql insert into t1 values(1648791223002,12,14,13,11.1);
$loop_count = 0
loop08:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 2 then
print ======$data11
goto loop08
endi
if $data12 != 2 then
print ======$data12
goto loop08
endi
if $data13 != 24 then
print ======$data13
goto loop08
endi
if $data14 != 14 then
print ======$data14
goto loop08
endi
if $data15 != 13 then
print ======$data15
goto loop08
endi
sql insert into t1 values(1648791223003,12,14,13,11.1);
$loop_count = 0
loop09:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
goto loop09
endi
if $data12 != 3 then
print ======$data12
goto loop09
endi
if $data13 != 36 then
print ======$data13
goto loop09
endi
if $data14 != 14 then
print ======$data14
goto loop09
endi
if $data15 != 13 then
print ======$data15
goto loop09
endi
sql insert into t1 values(1648791223001,1,1,1,1.1);
sql insert into t1 values(1648791223002,2,2,2,2.1);
sql insert into t1 values(1648791223003,3,3,3,3.1);
$loop_count = 0
loop010:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 3 then
print ======$data11
goto loop010
endi
if $data12 != 3 then
print ======$data12
goto loop010
endi
if $data13 != 6 then
print ======$data13
goto loop010
endi
if $data14 != 3 then
print ======$data14
goto loop010
endi
if $data15 != 1 then
print ======$data15
goto loop010
endi
sql insert into t1 values(1648791233003,3,2,3,2.1);
sql insert into t1 values(1648791233002,5,6,7,8.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
$loop_count = 0
loop011:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 2
if $data21 != 2 then
print ======$data21
goto loop011
endi
if $data22 != 2 then
print ======$data22
goto loop011
endi
if $data23 != 6 then
print ======$data23
goto loop011
endi
if $data24 != 2 then
print ======$data24
goto loop011
endi
if $data25 != 3 then
print ======$data25
goto loop011
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
$loop_count = 0
loop012:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
goto loop012
endi
if $data02 != 4 then
print ======$data02
goto loop012
endi
if $data03 != 50 then
print ======$data03 != 50
goto loop012
endi
if $data04 != 20 then
print ======$data04 != 20
goto loop012
endi
if $data05 != 3 then
print ======$data05
goto loop012
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
$loop_count = 0
loop013:
sleep 1000
$loop_count = $loop_count + 1
if $loop_count == 30 then
return -1
endi
sql select `_wstart`, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
goto loop013
endi
if $data12 != 4 then
print ======$data12
goto loop013
endi
if $data13 != 46 then
print ======$data13 != 46
goto loop013
endi
if $data14 != 20 then
print ======$data14 != 20
goto loop013
endi
if $data15 != 1 then
print ======$data15
goto loop013
endi
# row 2
if $data21 != 4 then
print ======$data21
goto loop013
endi
if $data22 != 4 then
print ======$data22
goto loop013
endi
if $data23 != 15 then
print ======$data23
goto loop013
endi
if $data24 != 4 then
print ======$data24
goto loop013
endi
if $data25 != 3 then
print ======$data25
goto loop013
endi
print ======over
......@@ -26,7 +26,7 @@ class TDTestCase:
'c1':'int',
'c2':'float',
'c3':'binary(20)'
}
# structure of tag
self.tag_dict = {
......@@ -60,7 +60,7 @@ class TDTestCase:
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(len(values_list))
elif tb_type == 'stb':
tdSql.checkRows(len(self.values_list)*tb_num)
tdSql.checkRows(len(self.values_list)*tb_num)
for time in ['2020-01-32T08:00:00','2020-13-32T08:00:00','acd']:
tdSql.query(f"select to_unixtimestamp('{time}') from {tbname}")
if tb_type == 'ntb' or tb_type == 'ctb':
......@@ -74,7 +74,7 @@ class TDTestCase:
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(len(values_list))
elif tb_type == 'stb':
tdSql.checkRows(len(values_list)*tb_num)
tdSql.checkRows(len(values_list)*tb_num)
for time in self.error_param:
tdSql.error(f"select to_unixtimestamp({time}) from {tbname}")
def timestamp_change_check_ntb(self):
......@@ -95,9 +95,20 @@ class TDTestCase:
self.data_check(f'{self.stbname}_{i}',self.values_list,'ctb')
self.data_check(self.stbname,self.values_list,'stb',self.tbnum)
tdSql.execute(f'drop database {self.dbname}')
def timestamp_change_return_type(self):
tdSql.query(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 0);")
tdSql.checkEqual(tdSql.queryResult[0][0], 0)
tdSql.query(f"select to_unixtimestamp('1970-01-01 00:00:00', 1);")
tdSql.checkData(0, 0, '1970-01-01 00:00:00')
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 2);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1.5);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 'abc');")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', true);")
tdSql.error(f"select to_unixtimestamp('1970-01-01 08:00:00+08:00', 1, 3);")
def run(self): # sourcery skip: extract-duplicate-method
self.timestamp_change_check_ntb()
self.timestamp_change_check_stb()
self.timestamp_change_return_type()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
......
......@@ -14,7 +14,7 @@ python3 ./test.py -f 1-insert/insertWithMoreVgroup.py -P
python3 ./test.py -f 1-insert/table_comment.py -P
python3 ./test.py -f 1-insert/time_range_wise.py -P
python3 ./test.py -f 1-insert/block_wise.py -P
python3 ./test.py -f 1-insert/create_retentions.py -P
#python3 ./test.py -f 1-insert/create_retentions.py -P
python3 ./test.py -f 1-insert/mutil_stage.py -P
python3 ./test.py -f 1-insert/table_param_ttl.py -P
python3 ./test.py -f 1-insert/table_param_ttl.py -P -R
......@@ -685,4 +685,4 @@ python3 ./test.py -f 2-query/blockSMA.py -P -Q 4
python3 ./test.py -f 2-query/projectionDesc.py -P -Q 4
python3 ./test.py -f 2-query/odbc.py -P
python3 ./test.py -f 99-TDcase/TD-21561.py -P -Q 4
python3 ./test.py -f 99-TDcase/TD-20582.py -P
\ No newline at end of file
python3 ./test.py -f 99-TDcase/TD-20582.py -P
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册