提交 087517dd 编写于 作者: W wpan

Merge branch 'develop' into hotfix/TD-4910

......@@ -427,12 +427,15 @@ TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时
* res:查询结果集,注意结果集中可能没有记录
* param:调用 `taos_subscribe`时客户程序提供的附加参数
* code:错误码
**注意**:在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
* `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用`taos_consume`的间隔小于订阅的轮询周期,API将会阻塞,直到时间间隔超过此周期。 如果数据库有新记录到达,该API将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此API。
**注意**:在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()``taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
* `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为0,API会保留订阅的进度信息,后续调用 `taos_subscribe` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
......
......@@ -854,7 +854,23 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
适用于:**表**。
适用于:**表、(超级表)**。
说明:从 2.1.3.0 版本开始,TWA 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **IRATE**
```mysql
SELECT IRATE(field_name) FROM tb_name WHERE clause;
```
功能说明:计算瞬时增长率。使用时间区间中最后两个样本数据来计算瞬时增长速率;如果这两个值呈递减关系,那么只取最后一个数用于计算,而不是使用二者差值。
返回结果数据类型:双精度浮点数Double。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
适用于:**表、(超级表)**。
说明:(从 2.1.3.0 版本开始新增此函数)IRATE 可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **SUM**
```mysql
......@@ -1203,13 +1219,14 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
```
### 计算函数
- **DIFF**
```mysql
SELECT DIFF(field_name) FROM tb_name [WHERE clause];
```
功能说明:统计表中某列的值与前一行对应值的差。
返回结果数据类型: 同应用字段。
返回结果数据类型:同应用字段。
应用字段:不能应用在timestamp、binary、nchar、bool类型字段。
......@@ -1227,13 +1244,27 @@ TDengine支持针对数据的聚合查询。提供支持的聚合和选择函数
Query OK, 2 row(s) in set (0.001162s)
```
- **DERIVATIVE**
```mysql
SELECT DERIVATIVE(field_name, time_interval, ignore_negative) FROM tb_name [WHERE clause];
```
功能说明:统计表中某列数值的单位变化率。其中单位时间区间的长度可以通过 time_interval 参数指定,最小可以是 1 秒(1s);ignore_negative 参数的值可以是 0 或 1,为 1 时表示忽略负值。
返回结果数据类型:双精度浮点数。
应用字段:不能应用在 timestamp、binary、nchar、bool 类型字段。
适用于:**表、(超级表)**。
说明:(从 2.1.3.0 版本开始新增此函数)输出结果行数是范围内总行数减一,第一行没有结果输出。DERIVATIVE 函数可以在由 GROUP BY 划分出单独时间线的情况下用于超级表(也即 GROUP BY tbname)。
- **SPREAD**
```mysql
SELECT SPREAD(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
功能说明:统计表/超级表中某列的最大值和最小值之差。
返回结果数据类型: 双精度浮点数。
返回结果数据类型:双精度浮点数。
应用字段:不能应用在binary、nchar、bool类型字段。
......
......@@ -898,7 +898,9 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
SSDataBlock* pBlock = NULL;
while(1) {
bool prev = *newgroup;
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = upstream->exec(upstream, newgroup);
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
*newgroup = prev;
break;
......@@ -966,7 +968,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
SSDataBlock* pBlock = NULL;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -974,7 +978,9 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) {
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -986,7 +992,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
return pBlock;
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......@@ -1000,7 +1009,10 @@ static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) {
}
while ((*newgroup) == false) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
......
......@@ -165,6 +165,7 @@ bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType,
} else {
tbufWriteUint32(&bw, colType);
}
tbufWriteInt32(&bw, (int32_t)(pList->size));
for (int32_t i = 0; i < (int32_t)pList->size; i++) {
......@@ -181,10 +182,11 @@ bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType,
}
tbufWriteInt64(&bw, var->i64);
} else if (IS_UNSIGNED_NUMERIC_TYPE(colType)) {
if (IS_SIGNED_NUMERIC_TYPE(var->nType) && IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
if (IS_SIGNED_NUMERIC_TYPE(var->nType) || IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
tbufWriteUint64(&bw, var->u64);
} else {
break;
}
tbufWriteUint64(&bw, var->u64);
}
} else if (colType == TSDB_DATA_TYPE_DOUBLE || colType == TSDB_DATA_TYPE_FLOAT) {
if (IS_SIGNED_NUMERIC_TYPE(var->nType) || IS_UNSIGNED_NUMERIC_TYPE(var->nType)) {
tbufWriteDouble(&bw, (double)(var->i64));
......@@ -2069,33 +2071,29 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types";
int16_t type = 0;
int16_t bytes = 0;
int32_t functionID = cvtFunc.execFuncId;
if (functionID == TSDB_FUNC_SPREAD) {
int32_t f = cvtFunc.execFuncId;
if (f == TSDB_FUNC_SPREAD) {
int32_t t1 = pSchema->type;
if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) {
if (IS_VAR_DATA_TYPE(t1) || t1 == TSDB_DATA_TYPE_BOOL) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return -1;
} else {
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypes[type].bytes;
}
} else {
type = pSchema->type;
bytes = pSchema->bytes;
}
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pCmd), bytes, false);
int16_t resType = 0;
int16_t resBytes = 0;
int32_t interBufSize = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, f, 0, &resType, &resBytes, &interBufSize, 0, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, pColIndex, resType, resBytes, getNewResColId(pCmd), interBufSize, false);
tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName));
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) {
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != f) {
pExpr->base.colInfo.flag |= TSDB_COL_NULL;
}
// set reverse order scan data blocks for last query
if (functionID == TSDB_FUNC_LAST) {
if (f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = TSDB_ORDER_DESC;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
......@@ -2108,7 +2106,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
// if it is not in the final result, do not add it
SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->base.aliasName, pExpr);
insertResultField(pQueryInfo, resColIdx, &ids, resBytes, (int8_t)resType, pExpr->base.aliasName, pExpr);
} else {
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
}
......@@ -2557,8 +2555,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tVariant* pVariant = &pParamElem[1].pNode->value;
int8_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes;
int16_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes;
int32_t interResult = 0;
char val[8] = {0};
......@@ -2571,8 +2570,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
resultSize = sizeof(double);
resultType = TSDB_DATA_TYPE_DOUBLE;
getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false);
/*
* sql function transformation
......@@ -2582,7 +2580,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
......@@ -2617,7 +2615,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->base.aliasName, pExpr);
insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr);
} else {
assert(ids.num == 1);
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
......@@ -7784,10 +7782,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg5 = "only tag query not compatible with normal column filter";
const char* msg6 = "not support stddev/percentile in outer query yet";
const char* msg7 = "drivative requires timestamp column exists in subquery";
const char* msg6 = "not support stddev/percentile/interp in the outer query yet";
const char* msg7 = "derivative/twa/irate requires timestamp column exists in subquery";
const char* msg8 = "condition missing for join query";
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -7829,15 +7827,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
......@@ -7852,9 +7852,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
if (tscNumOfExprs(pQueryInfo) > 1) {
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
if (numOfExprs == 1) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
} else {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.functionId == TSDB_FUNC_DERIVATIVE && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
......
......@@ -3604,10 +3604,10 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
// todo refactor: filter should not be applied here.
createFilterInfo(pQueryAttr, 0);
pQueryAttr->numOfFilterCols = 0;
SArray* pa = NULL;
if (stage == MASTER_SCAN) {
pQueryAttr->createFilterOperator = false; // no need for parent query
pa = createExecOperatorPlan(pQueryAttr);
} else {
pa = createGlobalMergePlan(pQueryAttr);
......
......@@ -825,7 +825,10 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pStatus->index = 0;
if (pStatus->pBlock == NULL) {
......
......@@ -1204,23 +1204,24 @@ static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
return ;
}
int totalLen = 0;
int64_t totalLen = 0;
// fetch the records row by row
while((row = taos_fetch_row(res))) {
if ((strlen(pThreadInfo->filePath) > 0)
&& (totalLen >= 100*1024*1024 - 32000)) {
appendResultBufToFile(databuf, pThreadInfo);
if (totalLen >= 100*1024*1024 - 32000) {
if (strlen(pThreadInfo->filePath) > 0)
appendResultBufToFile(databuf, pThreadInfo);
totalLen = 0;
memset(databuf, 0, 100*1024*1024);
}
num_rows++;
char temp[16000] = {0};
char temp[16000] = {0};
int len = taos_print_row(temp, row, fields, num_fields);
len += sprintf(temp + len, "\n");
//printf("query result:%s\n", temp);
memcpy(databuf + totalLen, temp, len);
totalLen += len;
debugPrint("totalLen: %"PRId64"\n", totalLen);
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
......
......@@ -1126,7 +1126,7 @@ int taosGetTableDes(
strncpy(tableDes->cols[count].field, (char *)row[TSDB_DESCRIBE_METRIC_FIELD_INDEX],
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
strncpy(tableDes->cols[count].type, (char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
min(16, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
tableDes->cols[count].length = *((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
strncpy(tableDes->cols[count].note, (char *)row[TSDB_DESCRIBE_METRIC_NOTE_INDEX],
fields[TSDB_DESCRIBE_METRIC_NOTE_INDEX].bytes);
......
......@@ -29,7 +29,7 @@ extern "C" {
#include "osMath.h"
#include "osMemory.h"
#include "osRand.h"
#include "osSemphone.h"
#include "osSemaphore.h"
#include "osSignal.h"
#include "osSleep.h"
#include "osSocket.h"
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SEMPHONE_H
#define TDENGINE_OS_SEMPHONE_H
#ifndef TDENGINE_OS_SEMAPHORE_H
#define TDENGINE_OS_SEMAPHORE_H
#ifdef __cplusplus
extern "C" {
......
......@@ -228,13 +228,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
case TSDB_DATA_TYPE_NCHAR:
httpJsonStringForTransMean(jsonBuf, (char *)row[i], fields[i].bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
if (precision == TSDB_TIME_PRECISION_MILLI) { // ms
httpJsonInt64(jsonBuf, *((int64_t *)row[i]));
} else {
httpJsonInt64(jsonBuf, *((int64_t *)row[i]) / 1000);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t ts = convertTimePrecision(*((int64_t *)row[i]), precision, TSDB_TIME_PRECISION_MILLI);
httpJsonInt64(jsonBuf, ts);
break;
}
default:
httpJsonString(jsonBuf, "-", 1);
break;
......
......@@ -133,6 +133,28 @@ typedef struct STableQueryInfo {
SResultRowInfo resInfo;
} STableQueryInfo;
typedef enum {
QUERY_PROF_BEFORE_OPERATOR_EXEC = 0,
QUERY_PROF_AFTER_OPERATOR_EXEC,
QUERY_PROF_QUERY_ABORT
} EQueryProfEventType;
typedef struct {
EQueryProfEventType eventType;
int64_t eventTime;
union {
uint8_t operatorType; //for operator event
int32_t abortCode; //for query abort event
};
} SQueryProfEvent;
typedef struct {
uint8_t operatorType;
int64_t sumSelfTime;
int64_t sumRunTimes;
} SOperatorProfResult;
typedef struct SQueryCostInfo {
uint64_t loadStatisTime;
uint64_t loadFileBlockTime;
......@@ -154,6 +176,9 @@ typedef struct SQueryCostInfo {
uint64_t tableInfoSize;
uint64_t hashSize;
uint64_t numOfTimeWindows;
SArray* queryProfEvents; //SArray<SQueryProfEvent>
SHashObj* operatorProfResults; //map<operator_type, SQueryProfEvent>
} SQueryCostInfo;
typedef struct {
......@@ -192,6 +217,7 @@ typedef struct SQueryAttr {
bool needReverseScan; // need reverse scan
bool distinctTag; // distinct tag query
bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed
int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number
......@@ -285,7 +311,7 @@ enum OPERATOR_TYPE_E {
OP_TagScan = 4,
OP_TableBlockInfoScan= 5,
OP_Aggregate = 6,
OP_Arithmetic = 7,
OP_Project = 7,
OP_Groupby = 8,
OP_Limit = 9,
OP_SLimit = 10,
......@@ -413,13 +439,13 @@ typedef struct SAggOperatorInfo {
uint32_t seed;
} SAggOperatorInfo;
typedef struct SArithOperatorInfo {
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
int32_t bufCapacity;
uint32_t seed;
SSDataBlock *existDataBlock;
} SArithOperatorInfo;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
int64_t limit;
......@@ -513,7 +539,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -586,7 +612,12 @@ int32_t doDumpQueryResult(SQInfo *pQInfo, char *data);
size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows);
void setQueryKilled(SQInfo *pQInfo);
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType);
void publishQueryAbortEvent(SQInfo* pQInfo, int32_t code);
void calculateOperatorProfResults(SQInfo* pQInfo);
void queryCostStatis(SQInfo *pQInfo);
void freeQInfo(SQInfo *pQInfo);
void freeQueryAttr(SQueryAttr *pQuery);
......
......@@ -74,7 +74,6 @@
} while (0);
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
......
此差异已折叠。
......@@ -565,7 +565,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
......@@ -585,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->sw.gap > 0) {
......@@ -593,7 +593,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->stateWindow) {
......@@ -601,7 +601,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else if (pQueryAttr->simpleAgg) {
......@@ -619,15 +619,15 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
} else { // diff/add/multiply/subtract/division
if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->vgId == 0) { // todo refactor
if (pQueryAttr->numOfFilterCols > 0 && pQueryAttr->createFilterOperator && pQueryAttr->vgId == 0) { // todo refactor
op = OP_Filter;
taosArrayPush(plan, &op);
} else {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
}
......@@ -665,7 +665,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
}
if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic;
op = OP_Project;
taosArrayPush(plan, &op);
}
}
......
......@@ -232,6 +232,7 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
// error occurs, record the error code and return to client
int32_t ret = setjmp(pQInfo->runtimeEnv.env);
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pQInfo, ret);
pQInfo->code = ret;
qDebug("QInfo:0x%"PRIx64" query abort due to error/cancel occurs, code:%s", pQInfo->qId, tstrerror(pQInfo->code));
return doBuildResCheck(pQInfo);
......@@ -240,7 +241,9 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
qDebug("QInfo:0x%"PRIx64" query task is launched", pQInfo->qId);
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
if (isQueryKilled(pQInfo)) {
......
......@@ -28,10 +28,41 @@ typedef struct SSchedMsg {
void *thandle;
} SSchedMsg;
void *taosInitScheduler(int queueSize, int numOfThreads, const char *label);
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl);
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg);
void taosCleanUpScheduler(void *param);
/**
* Create a thread-safe ring-buffer based task queue and return the instance. A thread
* pool will be created to consume the messages in the queue.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @return the created queue scheduler
*/
void *taosInitScheduler(int capacity, int numOfThreads, const char *label);
/**
* Create a thread-safe ring-buffer based task queue and return the instance.
* Same as taosInitScheduler, and it also print the queue status every 1 minite.
* @param capacity the queue capacity
* @param numOfThreads the number of threads for the thread pool
* @param label the label of the queue
* @param tmrCtrl the timer controller, tmr_ctrl_t*
* @return the created queue scheduler
*/
void *taosInitSchedulerWithInfo(int capacity, int numOfThreads, const char *label, void *tmrCtrl);
/**
* Clean up the queue scheduler instance and free the memory.
* @param queueScheduler the queue scheduler to free
*/
void taosCleanUpScheduler(void *queueScheduler);
/**
* Schedule a new task to run, the task is described by pMsg.
* The function may be blocked if no thread is available to execute the task.
* That may happen when all threads are busy.
* @param queueScheduler the queue scheduler instance
* @param pMsg the message for the task
*/
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg);
#ifdef __cplusplus
}
......
......@@ -108,39 +108,47 @@ void *taosInitScheduler(int queueSize, int numOfThreads, const char *label) {
void *taosInitSchedulerWithInfo(int queueSize, int numOfThreads, const char *label, void *tmrCtrl) {
SSchedQueue* pSched = taosInitScheduler(queueSize, numOfThreads, label);
if (tmrCtrl != NULL && pSched != NULL) {
pSched->pTmrCtrl = tmrCtrl;
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
return pSched;
}
void *taosProcessSchedQueue(void *param) {
void *taosProcessSchedQueue(void *scheduler) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
SSchedQueue *pSched = (SSchedQueue *)scheduler;
int ret = 0;
while (1) {
if (tsem_wait(&pSched->fullSem) != 0) {
uError("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->fullSem)) != 0) {
uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pSched->stop) {
break;
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (tsem_post(&pSched->emptySem) != 0)
uError("post %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_post(&pSched->emptySem)) != 0) {
uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (msg.fp)
(*(msg.fp))(&msg);
......@@ -151,30 +159,37 @@ void *taosProcessSchedQueue(void *param) {
return NULL;
}
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
void taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)queueScheduler;
int ret = 0;
if (pSched == NULL) {
uError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
return;
}
if (tsem_wait(&pSched->emptySem) != 0) {
uError("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
if ((ret = tsem_wait(&pSched->emptySem)) != 0) {
uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
uError("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_lock(&pSched->queueMutex)) != 0) {
uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
uError("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
if (tsem_post(&pSched->fullSem) != 0)
uError("post %s fullSem failed(%s)", pSched->label, strerror(errno));
if ((ret = pthread_mutex_unlock(&pSched->queueMutex)) != 0) {
uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
return 0;
if ((ret = tsem_post(&pSched->fullSem)) != 0) {
uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno));
exit(ret);
}
}
void taosCleanUpScheduler(void *param) {
......@@ -219,4 +234,4 @@ void taosDumpSchedulerStatus(void *qhandle, void *tmrId) {
}
taosTmrReset(taosDumpSchedulerStatus, DUMP_SCHEDULER_TIME_WINDOW, pSched, pSched->pTmrCtrl, &pSched->pTimer);
}
}
\ No newline at end of file
......@@ -741,4 +741,44 @@ if $data14 != 2 then
return -1
endi
sql create table m1 (ts timestamp, k int, f1 int) tags(a int);
sql create table tm0 using m1 tags(0);
sql create table tm1 using m1 tags(1);
sql insert into tm0 values('2020-1-1 1:1:1', 1, 10);
sql insert into tm0 values('2020-1-1 1:1:2', 1, 20);
sql insert into tm1 values('2020-2-1 1:1:1', 2, 10);
sql insert into tm1 values('2020-2-1 1:1:2', 2, 20);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 100
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sleep 100
sql use group_db0;
print =========================>TD-4894
sql select count(*),k from m1 group by k;
if $rows != 2 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
if $data11 != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -180,20 +180,82 @@ if $data21 != 49.500000000 then
endi
#define TSDB_FUNC_APERCT 7
#define TSDB_FUNC_LAST_ROW 10
#define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15
#define TSDB_FUNC_ARITHM 23
#define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_RATE 29
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql_error select interp(c1) from (select * from nest_tb0);
sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
sql_error select twa(c1) from (select c1 from nest_tb0);
sql_error select irate(c1) from (select c1 from nest_tb0);
sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
sql select twa(c1) from (select * from nest_tb0);
sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
sql select irate(c1) from (select * from nest_tb0);
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
if $rows != 7 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 48.666666667 then
print expect 48.666666667, actual: $data01
return -1
endi
if $data02 != 70080.000000000 then
return -1
endi
if $data03 != 99 then
return -1
endi
if $data04 != 0 then
return -1
endi
if $data05 != 1440 then
return -1
endi
if $data06 != 0 then
print $data06
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != 99.000000000 then
print expect 99.000000000, actual: $data08
return -1
endi
if $data10 != @20-09-16 00:00:00.000@ then
return -1
endi
if $data11 != 49.777777778 then
return -1
endi
if $data12 != 71680.000000000 then
return -1
endi
sql select top(x, 20) from (select c1 x from nest_tb0);
......@@ -207,6 +269,9 @@ print ===================> group by + having
print =========================> ascending order/descending order
print =========================> nest query join
......@@ -273,7 +338,6 @@ if $data03 != @20-09-15 00:00:00.000@ then
return -1
endi
sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
sql select diff(val) from (select c1 val from nest_tb0);
if $rows != 9999 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册