diff --git a/documentation20/cn/08.connector/docs.md b/documentation20/cn/08.connector/docs.md
index 2d76c866d11c1e1f51927c5536184b15aa6afe14..aa5fa50b66e237b87de2893678d4e6c2738d21cb 100644
--- a/documentation20/cn/08.connector/docs.md
+++ b/documentation20/cn/08.connector/docs.md
@@ -427,12 +427,15 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
* res:查询结果集,注意结果集中可能没有记录
* param:调用 `taos_subscribe`时客户程序提供的附加参数
* code:错误码
+
**注意**:在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
* `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用`taos_consume`的间隔小于订阅的轮询周期,API将会阻塞,直到时间间隔超过此周期。 如果数据库有新记录到达,该API将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此API。
+ **注意**:在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()` 或 `taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
+
* `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为0,API会保留订阅的进度信息,后续调用 `taos_subscribe` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
diff --git a/documentation20/cn/12.taos-sql/docs.md b/documentation20/cn/12.taos-sql/docs.md
index 5904abbbaa5598357fe57dfcc2ce68b731524b75..844865f6dbf67bc5031ea2556ac0e937ac965898 100644
--- a/documentation20/cn/12.taos-sql/docs.md
+++ b/documentation20/cn/12.taos-sql/docs.md
@@ -854,7 +854,23 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
- 适用于:**表**。
+ 适用于:**表、(超级表)**。
+
+ 说明:从 2.1.3.0 版本开始,TWA 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
+
+- **IRATE**
+ ```mysql
+ SELECT IRATE(field_name) FROM tb_name WHERE clause;
+ ```
+ 功能说明:计算瞬时增长率。使用时间区间中最后两个样本数据来计算瞬时增长速率;如果这两个值呈递减关系,那么只取最后一个数用于计算,而不是使用二者差值。
+
+ 返回结果数据类型:双精度浮点数Double。
+
+ 应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
+
+ 适用于:**表、(超级表)**。
+
+ 说明:(从 2.1.3.0 版本开始新增此函数)IRATE 可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **SUM**
```mysql
@@ -1203,13 +1219,14 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
```
### 计算函数
+
- **DIFF**
```mysql
SELECT DIFF(field_name) FROM tb_name [WHERE clause];
```
功能说明:统计表中某列的值与前一行对应值的差。
- 返回结果数据类型: 同应用字段。
+ 返回结果数据类型:同应用字段。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
@@ -1227,13 +1244,27 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 2 row(s) in set (0.001162s)
```
+- **DERIVATIVE**
+ ```mysql
+ SELECT DERIVATIVE(field_name, time_interval, ignore_negative) FROM tb_name [WHERE clause];
+ ```
+ 功能说明:统计表中某列数值的单位变化率。其中单位时间区间的长度可以通过 time_interval 参数指定,最小可以是 1 秒(1s);ignore_negative 参数的值可以是 0 或 1,为 1 时表示忽略负值。
+
+ 返回结果数据类型:双精度浮点数。
+
+ 应用字段:不能应用在 timestamp、binary、nchar、bool 类型字段。
+
+ 适用于:**表、(超级表)**。
+
+ 说明:(从 2.1.3.0 版本开始新增此函数)输出结果行数是范围内总行数减一,第一行没有结果输出。DERIVATIVE 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
+
- **SPREAD**
```mysql
SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
功能说明:统计表/超级表中某列的最大值和最小值之差。
- 返回结果数据类型: 双精度浮点数。
+ 返回结果数据类型:双精度浮点数。
应用字段:不能应用在binary、nchar、bool类型字段。
diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c
index d835b37c2497c241d52a243d34ab4ab63e76c12a..ffec03b65adc38db15d3e57bb11dccb8b0f93a92 100644
--- a/src/client/src/tscGlobalmerge.c
+++ b/src/client/src/tscGlobalmerge.c
@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
bool prev = *newgroup;
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prev;
break;
@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
}
while ((*newgroup) == false) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index b388c21599041053c549706c59792f7b64e4ed6e..4481a475f90aa528e6d459c0e244db2cf8954dbe 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -165,6 +165,7 @@ bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType,
} else {
tbufWriteUint32(&bw, colType);
}
+
tbufWriteInt32(&bw, (int32_t)(pList->size));
for (int32_t i = 0; i < (int32_t)pList->size; i++) {
@@ -181,10 +182,11 @@ bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType,
}
tbufWriteInt64(&bw, var->i64);
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
- if (IS_SIGNED_NUMERIC_TYPE(var->nType) && IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
+ if (IS_SIGNED_NUMERIC_TYPE(var->nType) || IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
+ tbufWriteUint64(&bw, var->u64);
+ } else {
break;
- }
- tbufWriteUint64(&bw, var->u64);
+ }
} else if (colType == TSDB_DATA_TYPE_DOUBLE || colType == TSDB_DATA_TYPE_FLOAT) {
if (IS_SIGNED_NUMERIC_TYPE(var->nType) || IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
tbufWriteDouble(&bw, (double)(var->i64));
@@ -2069,33 +2071,29 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types";
- int16_t type = 0;
- int16_t bytes = 0;
- int32_t functionID = cvtFunc.execFuncId;
-
- if (functionID == TSDB_FUNC_SPREAD) {
+ int32_t f = cvtFunc.execFuncId;
+ if (f == TSDB_FUNC_SPREAD) {
int32_t t1 = pSchema->type;
- if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) {
+ if (IS_VAR_DATA_TYPE(t1) || t1 == TSDB_DATA_TYPE_BOOL) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return -1;
- } else {
- type = TSDB_DATA_TYPE_DOUBLE;
- bytes = tDataTypes[type].bytes;
}
- } else {
- type = pSchema->type;
- bytes = pSchema->bytes;
}
- SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pCmd), bytes, false);
+ int16_t resType = 0;
+ int16_t resBytes = 0;
+ int32_t interBufSize = 0;
+
+ getResultDataInfo(pSchema->type, pSchema->bytes, f, 0, &resType, &resBytes, &interBufSize, 0, false);
+ SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, pColIndex, resType, resBytes, getNewResColId(pCmd), interBufSize, false);
tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName));
- if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
+ if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != f) {
pExpr->base.colInfo.flag |= TSDB_COL_NULL;
}
// set reverse order scan data blocks for last query
- if (functionID == TSDB_FUNC_LAST) {
+ if (f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = TSDB_ORDER_DESC;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
@@ -2108,7 +2106,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
// if it is not in the final result, do not add it
SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
if (finalResult) {
- insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->base.aliasName, pExpr);
+ insertResultField(pQueryInfo, resColIdx, &ids, resBytes, (int8_t)resType, pExpr->base.aliasName, pExpr);
} else {
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
}
@@ -2557,8 +2555,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tVariant* pVariant = &pParamElem[1].pNode->value;
- int8_t resultType = pSchema->type;
- int16_t resultSize = pSchema->bytes;
+ int16_t resultType = pSchema->type;
+ int16_t resultSize = pSchema->bytes;
+ int32_t interResult = 0;
char val[8] = {0};
@@ -2571,8 +2570,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
- resultSize = sizeof(double);
- resultType = TSDB_DATA_TYPE_DOUBLE;
+ getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false);
/*
* sql function transformation
@@ -2582,7 +2580,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
colIndex += 1; // the first column is ts
- pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
+ pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
@@ -2617,7 +2615,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
- insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->base.aliasName, pExpr);
+ insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr);
} else {
assert(ids.num == 1);
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
@@ -7784,10 +7782,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg5 = "only tag query not compatible with normal column filter";
- const char* msg6 = "not support stddev/percentile in outer query yet";
- const char* msg7 = "drivative requires timestamp column exists in subquery";
+ const char* msg6 = "not support stddev/percentile/interp in the outer query yet";
+ const char* msg7 = "derivative/twa/irate requires timestamp column exists in subquery";
const char* msg8 = "condition missing for join query";
-
+
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
@@ -7829,15 +7827,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
+
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
+
// todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
- if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
+ if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
@@ -7852,9 +7852,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
- if (tscNumOfExprs(pQueryInfo) > 1) {
+ int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
+ if (numOfExprs == 1) {
+ SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
+ int32_t f = pExpr->base.functionId;
+ if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
+ return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
+ }
+ } else {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 1);
- if (pExpr->base.functionId == TSDB_FUNC_DERIVATIVE && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
+ int32_t f = pExpr->base.functionId;
+ if ((f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index c3df4773e113f776e0b84e6089e516b3eb382808..4d97fef52f956b6d550f24c1bb88a34dd64c6d13 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -3604,10 +3604,10 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
// todo refactor: filter should not be applied here.
createFilterInfo(pQueryAttr, 0);
- pQueryAttr->numOfFilterCols = 0;
SArray* pa = NULL;
if (stage == MASTER_SCAN) {
+ pQueryAttr->createFilterOperator = false; // no need for parent query
pa = createExecOperatorPlan(pQueryAttr);
} else {
pa = createGlobalMergePlan(pQueryAttr);
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index 937fe1000aff64b981ee1f1947596854b2bd025d..cba01b1a1c84b3faa85a5bbf41bf42fca1b6031d 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
+
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0;
if (pStatus->pBlock == NULL) {
diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c
index 0eac1518a757798a0e32f4105b072f193a6486ac..b2ac11b2cadc928e927b56010c7720e6cd9ef9fa 100644
--- a/src/kit/taosdemo/taosdemo.c
+++ b/src/kit/taosdemo/taosdemo.c
@@ -1204,23 +1204,24 @@ static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
return ;
}
- int totalLen = 0;
+ int64_t totalLen = 0;
// fetch the records row by row
while((row = taos_fetch_row(res))) {
- if ((strlen(pThreadInfo->filePath) > 0)
- && (totalLen >= 100*1024*1024 - 32000)) {
- appendResultBufToFile(databuf, pThreadInfo);
+ if (totalLen >= 100*1024*1024 - 32000) {
+ if (strlen(pThreadInfo->filePath) > 0)
+ appendResultBufToFile(databuf, pThreadInfo);
totalLen = 0;
memset(databuf, 0, 100*1024*1024);
}
num_rows++;
- char temp[16000] = {0};
+ char temp[16000] = {0};
int len = taos_print_row(temp, row, fields, num_fields);
len += sprintf(temp + len, "\n");
//printf("query result:%s\n", temp);
memcpy(databuf + totalLen, temp, len);
totalLen += len;
+ debugPrint("totalLen: %"PRId64"\n", totalLen);
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
diff --git a/src/kit/taosdump/taosdump.c b/src/kit/taosdump/taosdump.c
index 05c6b1efbb7990260d5eaef454d4d3a339ec5268..33118ce3113eb401dcd9ba143e99ef359b07f935 100644
--- a/src/kit/taosdump/taosdump.c
+++ b/src/kit/taosdump/taosdump.c
@@ -1126,7 +1126,7 @@ int taosGetTableDes(
strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
- min(16, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
+ min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
diff --git a/src/os/inc/os.h b/src/os/inc/os.h
index 6731ca6d7db9ce72e72a88a1b9dadf76fb8ec87e..903e80d5c7f554d420eafc9224fe5e7e35fe8467 100644
--- a/src/os/inc/os.h
+++ b/src/os/inc/os.h
@@ -29,7 +29,7 @@ extern "C" {
#include "osMath.h"
#include "osMemory.h"
#include "osRand.h"
-#include "osSemphone.h"
+#include "osSemaphore.h"
#include "osSignal.h"
#include "osSleep.h"
#include "osSocket.h"
diff --git a/src/os/inc/osSemphone.h b/src/os/inc/osSemaphore.h
similarity index 97%
rename from src/os/inc/osSemphone.h
rename to src/os/inc/osSemaphore.h
index fe59095205010bef553413809706c62cd772a7e3..10d14700e013f66e6d98208f0e65fe1ca5fc3874 100644
--- a/src/os/inc/osSemphone.h
+++ b/src/os/inc/osSemaphore.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef TDENGINE_OS_SEMPHONE_H
-#define TDENGINE_OS_SEMPHONE_H
+#ifndef TDENGINE_OS_SEMAPHORE_H
+#define TDENGINE_OS_SEMAPHORE_H
#ifdef __cplusplus
extern "C" {
diff --git a/src/os/src/darwin/dwSemphone.c b/src/os/src/darwin/dwSemaphore.c
similarity index 100%
rename from src/os/src/darwin/dwSemphone.c
rename to src/os/src/darwin/dwSemaphore.c
diff --git a/src/os/src/detail/osSemphone.c b/src/os/src/detail/osSemaphore.c
similarity index 100%
rename from src/os/src/detail/osSemphone.c
rename to src/os/src/detail/osSemaphore.c
diff --git a/src/os/src/windows/wSemphone.c b/src/os/src/windows/wSemaphore.c
similarity index 100%
rename from src/os/src/windows/wSemphone.c
rename to src/os/src/windows/wSemaphore.c
diff --git a/src/plugins/http/src/httpGcJson.c b/src/plugins/http/src/httpGcJson.c
index 397791706d0fc24f250c2332dddc5b0b031a4817..f33a994465a94bad5d79df8af73ff4fd9d640516 100644
--- a/src/plugins/http/src/httpGcJson.c
+++ b/src/plugins/http/src/httpGcJson.c
@@ -228,13 +228,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break;
- case TSDB_DATA_TYPE_TIMESTAMP:
- if (precision == TSDB_TIME_PRECISION_MILLI) { // ms
- httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
- } else {
- httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
- }
+ case TSDB_DATA_TYPE_TIMESTAMP: {
+ int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI);
+ httpJsonInt64(jsonBuf, ts);
break;
+ }
default:
httpJsonString(jsonBuf, "-", 1);
break;
diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h
index 9cd1c5b033952d7bbe52ee523eb2fadc3c9d472b..8279c58b24796c734b39e97e9a8e953e0248332f 100644
--- a/src/query/inc/qExecutor.h
+++ b/src/query/inc/qExecutor.h
@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo;
} STableQueryInfo;
+typedef enum {
+ QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
+ QUERY_PROF_AFTER_OPERATOR_EXEC,
+ QUERY_PROF_QUERY_ABORT
+} EQueryProfEventType;
+
+typedef struct {
+ EQueryProfEventType eventType;
+ int64_t eventTime;
+
+ union {
+ uint8_t operatorType; //for operator event
+ int32_t abortCode; //for query abort event
+ };
+} SQueryProfEvent;
+
+typedef struct {
+ uint8_t operatorType;
+ int64_t sumSelfTime;
+ int64_t sumRunTimes;
+} SOperatorProfResult;
+
typedef struct SQueryCostInfo {
uint64_t loadStatisTime;
uint64_t loadFileBlockTime;
@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
+
+ SArray* queryProfEvents; //SArray
+ SHashObj* operatorProfResults; //map
} SQueryCostInfo;
typedef struct {
@@ -192,6 +217,7 @@ typedef struct SQueryAttr {
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
+ bool createFilterOperator; // if filter operator is needed
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
@@ -285,7 +311,7 @@ enum OPERATOR_TYPE_E {
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
- OP_Arithmetic = 7,
+ OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
@@ -413,13 +439,13 @@ typedef struct SAggOperatorInfo {
uint32_t seed;
} SAggOperatorInfo;
-typedef struct SArithOperatorInfo {
+typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufCapacity;
uint32_t seed;
SSDataBlock *existDataBlock;
-} SArithOperatorInfo;
+} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
@@ -513,7 +539,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
-SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
+SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
@@ -586,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
+
+void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
+void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
+void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
+
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c
index 8efc4aad4c82ce34047fb87e10db169a1d8e5e3f..676e5b6ce63648b2f8182baed756cf0f90039a44 100644
--- a/src/query/src/qAggMain.c
+++ b/src/query/src/qAggMain.c
@@ -74,7 +74,6 @@
} while (0);
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
-void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index a130a6e7ca3b7898189f20e2ddd57b1a42a585c9..336925c797569b7e2251561b95dc14ac0d8207c7 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -184,7 +184,7 @@ static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
-static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
+static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
@@ -912,7 +912,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
}
} else {
- if (/*pCtx[0].pInput == NULL && */pBlock->pDataBlock != NULL) {
+ if (pBlock->pDataBlock != NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
@@ -978,7 +978,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
}
}
-static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
+static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < numOfOutput; ++k) {
@@ -1282,11 +1282,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
return;
}
- int64_t* tsList = NULL;
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
- if (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
- tsList = (int64_t*) pFirstColData->pData;
- }
+ int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL;
STimeWindow w = TSWINDOW_INITIALIZER;
@@ -1319,12 +1316,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
}
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
- setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData,
- bytes);
+ setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes);
}
- int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes,
- item->groupIndex);
+ int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
@@ -1340,17 +1335,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
- setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val,
- bytes);
+ setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
- int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes,
- item->groupIndex);
+ int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
+ tfree(pInfo->prevData);
}
}
@@ -1806,17 +1800,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
- case OP_Arithmetic: { // TODO refactor to remove arith operator.
+ case OP_Project: { // TODO refactor to remove arith operator.
SOperatorInfo* prev = pRuntimeEnv->proot;
if (i == 0) {
- pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
+ pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
}
} else {
prev = pRuntimeEnv->proot;
assert(pQueryAttr->pExpr2 != NULL);
- pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
+ pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
}
break;
}
@@ -3791,6 +3785,103 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp
return pOutput->info.rows;
}
+void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
+ SQueryProfEvent event;
+ event.eventType = eventType;
+ event.eventTime = taosGetTimestampUs();
+ event.operatorType = operatorInfo->operatorType;
+
+ SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo;
+ if (qInfo->summary.queryProfEvents) {
+ taosArrayPush(qInfo->summary.queryProfEvents, &event);
+ }
+}
+
+void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code) {
+ SQueryProfEvent event;
+ event.eventType = QUERY_PROF_QUERY_ABORT;
+ event.eventTime = taosGetTimestampUs();
+ event.abortCode = code;
+
+ if (pQInfo->summary.queryProfEvents) {
+ taosArrayPush(pQInfo->summary.queryProfEvents, &event);
+ }
+}
+
+typedef struct {
+ uint8_t operatorType;
+ int64_t beginTime;
+ int64_t endTime;
+ int64_t selfTime;
+ int64_t descendantsTime;
+} SOperatorStackItem;
+
+static void doOperatorExecProfOnce(SOperatorStackItem* item, SQueryProfEvent* event, SArray* opStack, SHashObj* profResults) {
+ item->endTime = event->eventTime;
+ item->selfTime = (item->endTime - item->beginTime) - (item->descendantsTime);
+
+ for (int32_t j = 0; j < taosArrayGetSize(opStack); ++j) {
+ SOperatorStackItem* ancestor = taosArrayGet(opStack, j);
+ ancestor->descendantsTime += item->selfTime;
+ }
+
+ uint8_t operatorType = item->operatorType;
+ SOperatorProfResult* result = taosHashGet(profResults, &operatorType, sizeof(operatorType));
+ if (result != NULL) {
+ result->sumRunTimes++;
+ result->sumSelfTime += item->selfTime;
+ } else {
+ SOperatorProfResult opResult;
+ opResult.operatorType = operatorType;
+ opResult.sumSelfTime = item->selfTime;
+ opResult.sumRunTimes = 1;
+ taosHashPut(profResults, &(operatorType), sizeof(operatorType),
+ &opResult, sizeof(opResult));
+ }
+}
+
+void calculateOperatorProfResults(SQInfo* pQInfo) {
+ if (pQInfo->summary.queryProfEvents == NULL) {
+ qDebug("query prof events array is null");
+ return;
+ }
+
+ if (pQInfo->summary.operatorProfResults == NULL) {
+ qDebug("operator prof results hash is null");
+ return;
+ }
+
+ SArray* opStack = taosArrayInit(32, sizeof(SOperatorStackItem));
+ if (opStack == NULL) {
+ return;
+ }
+
+ size_t size = taosArrayGetSize(pQInfo->summary.queryProfEvents);
+ SHashObj* profResults = pQInfo->summary.operatorProfResults;
+
+ for (int i = 0; i < size; ++i) {
+ SQueryProfEvent* event = taosArrayGet(pQInfo->summary.queryProfEvents, i);
+ if (event->eventType == QUERY_PROF_BEFORE_OPERATOR_EXEC) {
+ SOperatorStackItem opItem;
+ opItem.operatorType = event->operatorType;
+ opItem.beginTime = event->eventTime;
+ opItem.descendantsTime = 0;
+ taosArrayPush(opStack, &opItem);
+ } else if (event->eventType == QUERY_PROF_AFTER_OPERATOR_EXEC) {
+ SOperatorStackItem* item = taosArrayPop(opStack);
+ assert(item->operatorType == event->operatorType);
+ doOperatorExecProfOnce(item, event, opStack, profResults);
+ } else if (event->eventType == QUERY_PROF_QUERY_ABORT) {
+ SOperatorStackItem* item;
+ while ((item = taosArrayPop(opStack)) != NULL) {
+ doOperatorExecProfOnce(item, event, opStack, profResults);
+ }
+ }
+ }
+
+ taosArrayDestroy(opStack);
+}
+
void queryCostStatis(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryCostInfo *pSummary = &pQInfo->summary;
@@ -3811,6 +3902,8 @@ void queryCostStatis(SQInfo *pQInfo) {
pSummary->numOfTimeWindows = 0;
}
+ calculateOperatorProfResults(pQInfo);
+
qDebug("QInfo:0x%"PRIx64" :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
pQInfo->qId, pSummary->elapsedTime, pSummary->firstStageMergeTime, pSummary->totalBlocks, pSummary->loadBlockStatis,
@@ -3818,6 +3911,15 @@ void queryCostStatis(SQInfo *pQInfo) {
qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb, hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
+
+ if (pSummary->operatorProfResults) {
+ SOperatorProfResult* opRes = taosHashIterate(pSummary->operatorProfResults, NULL);
+ while (opRes != NULL) {
+ qDebug("QInfo:0x%" PRIx64 " :cost summary: operator : %d, exec times: %" PRId64 ", self time: %" PRId64,
+ pQInfo->qId, opRes->operatorType, opRes->sumRunTimes, opRes->sumSelfTime);
+ opRes = taosHashIterate(pSummary->operatorProfResults, opRes);
+ }
+ }
}
//static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBlockInfo) {
@@ -4219,6 +4321,15 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
// create runtime environment
int32_t numOfTables = (int32_t)pQueryAttr->tableGroupInfo.numOfTables;
pQInfo->summary.tableInfoSize += (numOfTables * sizeof(STableQueryInfo));
+ pQInfo->summary.queryProfEvents = taosArrayInit(512, sizeof(SQueryProfEvent));
+ if (pQInfo->summary.queryProfEvents == NULL) {
+ qDebug("failed to allocate query prof events array");
+ }
+ pQInfo->summary.operatorProfResults =
+ taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_TINYINT), true, HASH_NO_LOCK);
+ if (pQInfo->summary.operatorProfResults == NULL) {
+ qDebug("failed to allocate operator prof results hash");
+ }
code = setupQueryRuntimeEnv(pRuntimeEnv, (int32_t) pQueryAttr->tableGroupInfo.numOfTables, pOperator, param);
if (code != TSDB_CODE_SUCCESS) {
@@ -4578,8 +4689,8 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
- } else if (pDownstream->operatorType == OP_Arithmetic) {
- SArithOperatorInfo *pInfo = pDownstream->info;
+ } else if (pDownstream->operatorType == OP_Project) {
+ SProjectOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
@@ -4843,7 +4954,10 @@ static SSDataBlock* doAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -4898,7 +5012,10 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -4934,23 +5051,23 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
return pInfo->pRes;
}
-static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
+static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
- SArithOperatorInfo* pArithInfo = pOperator->info;
+ SProjectOperatorInfo* pProjectInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
- SOptrBasicInfo *pInfo = &pArithInfo->binfo;
+ SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQueryAttr->order.order;
pRes->info.rows = 0;
- if (pArithInfo->existDataBlock) { // TODO refactor
+ if (pProjectInfo->existDataBlock) { // TODO refactor
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
- SSDataBlock* pBlock = pArithInfo->existDataBlock;
- pArithInfo->existDataBlock = NULL;
+ SSDataBlock* pBlock = pProjectInfo->existDataBlock;
+ pProjectInfo->existDataBlock = NULL;
*newgroup = true;
// todo dynamic set tags
@@ -4960,9 +5077,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
- updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
+ updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
- arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
+ projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
}
@@ -4978,7 +5095,10 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
bool prevVal = *newgroup;
// The upstream exec may change the value of the newgroup, so use a local variable instead.
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
assert(*newgroup == false);
@@ -4990,7 +5110,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// Return result of the previous group in the firstly.
if (*newgroup) {
if (pRes->info.rows > 0) {
- pArithInfo->existDataBlock = pBlock;
+ pProjectInfo->existDataBlock = pBlock;
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes;
} else { // init output buffer for a new group data
@@ -5010,9 +5130,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
- updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows);
+ updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
- arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
+ projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) {
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
}
@@ -5038,7 +5158,10 @@ static SSDataBlock* doLimit(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while (1) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
@@ -5088,7 +5211,10 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
while (1) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -5133,7 +5259,10 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -5186,7 +5315,10 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -5314,7 +5446,10 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
STimeWindow win = pQueryAttr->window;
SOperatorInfo* upstream = pOperator->upstream[0];
while (1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
break;
}
@@ -5372,7 +5507,9 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
@@ -5423,7 +5560,9 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
SOperatorInfo* upstream = pOperator->upstream[0];
while(1) {
+ publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
+ publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
break;
}
@@ -5489,7 +5628,10 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
}
while(1) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (*newgroup) {
assert(pBlock != NULL);
}
@@ -5649,8 +5791,8 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData);
}
-static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) {
- SArithOperatorInfo* pInfo = (SArithOperatorInfo*) param;
+static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
+ SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
}
@@ -5696,8 +5838,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
return pOperator;
}
-SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
- SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
+SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
+ SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
pInfo->seed = rand();
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
@@ -5710,8 +5852,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
- pOperator->name = "ArithmeticOperator";
- pOperator->operatorType = OP_Arithmetic;
+ pOperator->name = "ProjectOperator";
+ pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo;
@@ -5719,8 +5861,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv;
- pOperator->exec = doArithmeticOperation;
- pOperator->cleanup = destroyArithOperatorInfo;
+ pOperator->exec = doProjectOperation;
+ pOperator->cleanup = destroyProjectOperatorInfo;
appendUpstream(pOperator, upstream);
return pOperator;
@@ -6159,7 +6301,10 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
@@ -7045,6 +7190,8 @@ int32_t createFilterInfo(SQueryAttr* pQueryAttr, uint64_t qId) {
doCreateFilterInfo(pQueryAttr->tableCols, pQueryAttr->numOfCols, pQueryAttr->numOfFilterCols,
&pQueryAttr->pFilterInfo, qId);
+ pQueryAttr->createFilterOperator = true;
+
return TSDB_CODE_SUCCESS;
}
@@ -7483,6 +7630,9 @@ void freeQInfo(SQInfo *pQInfo) {
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
+ taosArrayDestroy(pQInfo->summary.queryProfEvents);
+ taosHashCleanup(pQInfo->summary.operatorProfResults);
+
taosArrayDestroy(pRuntimeEnv->groupResInfo.pRows);
pQInfo->signature = 0;
diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c
index ee587a515dca39559bc6d061501d4e3397c0781a..9c06a87b81c595a01f683c17c87b0418a09a5098 100644
--- a/src/query/src/qPlan.c
+++ b/src/query/src/qPlan.c
@@ -565,7 +565,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
@@ -585,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->sw.gap > 0) {
@@ -593,7 +593,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
@@ -601,7 +601,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->simpleAgg) {
@@ -619,15 +619,15 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
} else { // diff/add/multiply/subtract/division
- if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->vgId == 0) { // todo refactor
+ if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->createFilterOperator && pQueryAttr->vgId == 0) { // todo refactor
op = OP_Filter;
taosArrayPush(plan, &op);
} else {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
}
@@ -665,7 +665,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
- op = OP_Arithmetic;
+ op = OP_Project;
taosArrayPush(plan, &op);
}
}
diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c
index 38ef81e7938a3635273c0cfa2cb4e86ca2e35c1e..787cb2f7d1a34f8958977eb85cd3c2621ff9a047 100644
--- a/src/query/src/queryMain.c
+++ b/src/query/src/queryMain.c
@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
+ publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
+ publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
+ publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
diff --git a/src/util/inc/tsched.h b/src/util/inc/tsched.h
index 3e481cbc327b495975fb03bc4e4d850e4372f044..a1591512c1f87f524837a7986e3c8b3e14e25924 100644
--- a/src/util/inc/tsched.h
+++ b/src/util/inc/tsched.h
@@ -28,10 +28,41 @@ typedef struct SSchedMsg {
void *thandle;
} SSchedMsg;
-void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
-void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
-int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
-void taosCleanUpScheduler(void *param);
+/**
+ * Create a thread-safe ring-buffer based task queue and return the instance. A thread
+ * pool will be created to consume the messages in the queue.
+ * @param capacity the queue capacity
+ * @param numOfThreads the number of threads for the thread pool
+ * @param label the label of the queue
+ * @return the created queue scheduler
+ */
+void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
+
+/**
+ * Create a thread-safe ring-buffer based task queue and return the instance.
+ * Same as taosInitScheduler, and it also print the queue status every 1 minite.
+ * @param capacity the queue capacity
+ * @param numOfThreads the number of threads for the thread pool
+ * @param label the label of the queue
+ * @param tmrCtrl the timer controller, tmr_ctrl_t*
+ * @return the created queue scheduler
+ */
+void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl);
+
+/**
+ * Clean up the queue scheduler instance and free the memory.
+ * @param queueScheduler the queue scheduler to free
+ */
+void taosCleanUpScheduler(void *queueScheduler);
+
+/**
+ * Schedule a new task to run, the task is described by pMsg.
+ * The function may be blocked if no thread is available to execute the task.
+ * That may happen when all threads are busy.
+ * @param queueScheduler the queue scheduler instance
+ * @param pMsg the message for the task
+ */
+void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
#ifdef __cplusplus
}
diff --git a/src/util/src/tsched.c b/src/util/src/tsched.c
index f014dd0fab5494bd85f197e2c79fac53359e8edf..16142470c95678b8663f3bd437357dcdb22635a5 100644
--- a/src/util/src/tsched.c
+++ b/src/util/src/tsched.c
@@ -108,39 +108,47 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
-
+
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
-
+
return pSched;
}
-void *taosProcessSchedQueue(void *param) {
+void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg;
- SSchedQueue *pSched = (SSchedQueue *)param;
+ SSchedQueue *pSched = (SSchedQueue *)scheduler;
+ int ret = 0;
while (1) {
- if (tsem_wait(&pSched->fullSem) != 0) {
- uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
+ if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
+ uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
}
if (pSched->stop) {
break;
}
- if (pthread_mutex_lock(&pSched->queueMutex) != 0)
- uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
+ uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
- if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
- uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
+ uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
- if (tsem_post(&pSched->emptySem) != 0)
- uError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
+ if ((ret = tsem_post(&pSched->emptySem)) != 0) {
+ uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
if (msg.fp)
(*(msg.fp))(&msg);
@@ -151,30 +159,37 @@ void *taosProcessSchedQueue(void *param) {
return NULL;
}
-int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
- SSchedQueue *pSched = (SSchedQueue *)qhandle;
+void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
+ SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
+ int ret = 0;
+
if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg);
- return 0;
+ return;
}
- if (tsem_wait(&pSched->emptySem) != 0) {
- uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
+ if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
+ uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
}
- if (pthread_mutex_lock(&pSched->queueMutex) != 0)
- uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
+ uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
- if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
- uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
-
- if (tsem_post(&pSched->fullSem) != 0)
- uError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
+ if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
+ uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
- return 0;
+ if ((ret = tsem_post(&pSched->fullSem)) != 0) {
+ uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
+ exit(ret);
+ }
}
void taosCleanUpScheduler(void *param) {
@@ -219,4 +234,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
-}
+}
\ No newline at end of file
diff --git a/tests/script/general/parser/groupby.sim b/tests/script/general/parser/groupby.sim
index 507431f536cac84d61a18d9c599e6bf9d344766d..6ae5d420d878c462477aa41c245d146dba95ce5e 100644
--- a/tests/script/general/parser/groupby.sim
+++ b/tests/script/general/parser/groupby.sim
@@ -741,4 +741,44 @@ if $data14 != 2 then
return -1
endi
+sql create table m1 (ts timestamp, k int, f1 int) tags(a int);
+sql create table tm0 using m1 tags(0);
+sql create table tm1 using m1 tags(1);
+
+sql insert into tm0 values('2020-1-1 1:1:1', 1, 10);
+sql insert into tm0 values('2020-1-1 1:1:2', 1, 20);
+sql insert into tm1 values('2020-2-1 1:1:1', 2, 10);
+sql insert into tm1 values('2020-2-1 1:1:2', 2, 20);
+
+system sh/exec.sh -n dnode1 -s stop -x SIGINT
+sleep 100
+system sh/exec.sh -n dnode1 -s start
+sleep 100
+
+sql connect
+sleep 100
+sql use group_db0;
+
+print =========================>TD-4894
+sql select count(*),k from m1 group by k;
+if $rows != 2 then
+ return -1
+endi
+
+if $data00 != 2 then
+ return -1
+endi
+
+if $data01 != 1 then
+ return -1
+endi
+
+if $data10 != 2 then
+ return -1
+endi
+
+if $data11 != 2 then
+ return -1
+endi
+
system sh/exec.sh -n dnode1 -s stop -x SIGINT
diff --git a/tests/script/general/parser/nestquery.sim b/tests/script/general/parser/nestquery.sim
index 8249d9197f55998ae26cb6dd232b6a701bf0a32c..fd56a91dd679bc52850520693dca41b66e475edc 100644
--- a/tests/script/general/parser/nestquery.sim
+++ b/tests/script/general/parser/nestquery.sim
@@ -180,20 +180,82 @@ if $data21 != 49.500000000 then
endi
#define TSDB_FUNC_APERCT 7
-#define TSDB_FUNC_LAST_ROW 10
#define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15
-#define TSDB_FUNC_ARITHM 23
#define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28
-#define TSDB_FUNC_RATE 29
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0);
+sql_error select interp(c1) from (select * from nest_tb0);
+sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
+sql_error select twa(c1) from (select c1 from nest_tb0);
+sql_error select irate(c1) from (select c1 from nest_tb0);
+sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
+sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
+
+sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
+sql select twa(c1) from (select * from nest_tb0);
+sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
+sql select irate(c1) from (select * from nest_tb0);
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
+if $rows != 7 then
+ return -1
+endi
+
+if $data00 != @20-09-15 00:00:00.000@ then
+ return -1
+endi
+
+if $data01 != 48.666666667 then
+ print expect 48.666666667, actual: $data01
+ return -1
+endi
+
+if $data02 != 70080.000000000 then
+ return -1
+endi
+
+if $data03 != 99 then
+ return -1
+endi
+
+if $data04 != 0 then
+ return -1
+endi
+
+if $data05 != 1440 then
+ return -1
+endi
+
+if $data06 != 0 then
+ print $data06
+ return -1
+endi
+
+if $data07 != 1 then
+ return -1
+endi
+
+if $data08 != 99.000000000 then
+ print expect 99.000000000, actual: $data08
+ return -1
+endi
+
+if $data10 != @20-09-16 00:00:00.000@ then
+ return -1
+endi
+
+if $data11 != 49.777777778 then
+ return -1
+endi
+
+if $data12 != 71680.000000000 then
+ return -1
+endi
sql select top(x, 20) from (select c1 x from nest_tb0);
@@ -207,6 +269,9 @@ print ===================> group by + having
+print =========================> ascending order/descending order
+
+
print =========================> nest query join
@@ -273,7 +338,6 @@ if $data03 != @20-09-15 00:00:00.000@ then
return -1
endi
-sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
sql select diff(val) from (select c1 val from nest_tb0);
if $rows != 9999 then
return -1