未验证 提交 92671a20 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #6636 from taosdata/feature/query

Feature/query
...@@ -2071,33 +2071,29 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -2071,33 +2071,29 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) { const char* name, int32_t resColIdx, SColumnIndex* pColIndex, bool finalResult) {
const char* msg1 = "not support column types"; const char* msg1 = "not support column types";
int16_t type = 0; int32_t f = cvtFunc.execFuncId;
int16_t bytes = 0; if (f == TSDB_FUNC_SPREAD) {
int32_t functionID = cvtFunc.execFuncId;
if (functionID == TSDB_FUNC_SPREAD) {
int32_t t1 = pSchema->type; int32_t t1 = pSchema->type;
if (t1 == TSDB_DATA_TYPE_BINARY || t1 == TSDB_DATA_TYPE_NCHAR || t1 == TSDB_DATA_TYPE_BOOL) { if (IS_VAR_DATA_TYPE(t1) || t1 == TSDB_DATA_TYPE_BOOL) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return -1; return -1;
} else {
type = TSDB_DATA_TYPE_DOUBLE;
bytes = tDataTypes[type].bytes;
} }
} else {
type = pSchema->type;
bytes = pSchema->bytes;
} }
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, getNewResColId(pCmd), bytes, false); int16_t resType = 0;
int16_t resBytes = 0;
int32_t interBufSize = 0;
getResultDataInfo(pSchema->type, pSchema->bytes, f, 0, &resType, &resBytes, &interBufSize, 0, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, pColIndex, resType, resBytes, getNewResColId(pCmd), interBufSize, false);
tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName)); tstrncpy(pExpr->base.aliasName, name, tListLen(pExpr->base.aliasName));
if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != functionID) { if (cvtFunc.originFuncId == TSDB_FUNC_LAST_ROW && cvtFunc.originFuncId != f) {
pExpr->base.colInfo.flag |= TSDB_COL_NULL; pExpr->base.colInfo.flag |= TSDB_COL_NULL;
} }
// set reverse order scan data blocks for last query // set reverse order scan data blocks for last query
if (functionID == TSDB_FUNC_LAST) { if (f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1; pExpr->base.numOfParams = 1;
pExpr->base.param[0].i64 = TSDB_ORDER_DESC; pExpr->base.param[0].i64 = TSDB_ORDER_DESC;
pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT; pExpr->base.param[0].nType = TSDB_DATA_TYPE_INT;
...@@ -2110,7 +2106,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS ...@@ -2110,7 +2106,7 @@ static int32_t setExprInfoForFunctions(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SS
// if it is not in the final result, do not add it // if it is not in the final result, do not add it
SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex); SColumnList ids = createColumnList(1, pColIndex->tableIndex, pColIndex->columnIndex);
if (finalResult) { if (finalResult) {
insertResultField(pQueryInfo, resColIdx, &ids, bytes, (int8_t)type, pExpr->base.aliasName, pExpr); insertResultField(pQueryInfo, resColIdx, &ids, resBytes, (int8_t)resType, pExpr->base.aliasName, pExpr);
} else { } else {
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
} }
...@@ -2559,8 +2555,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2559,8 +2555,9 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tVariant* pVariant = &pParamElem[1].pNode->value; tVariant* pVariant = &pParamElem[1].pNode->value;
int8_t resultType = pSchema->type; int16_t resultType = pSchema->type;
int16_t resultSize = pSchema->bytes; int16_t resultSize = pSchema->bytes;
int32_t interResult = 0;
char val[8] = {0}; char val[8] = {0};
...@@ -2573,8 +2570,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2573,8 +2570,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
} }
resultSize = sizeof(double); getResultDataInfo(pSchema->type, pSchema->bytes, functionId, 0, &resultType, &resultSize, &interResult, 0, false);
resultType = TSDB_DATA_TYPE_DOUBLE;
/* /*
* sql function transformation * sql function transformation
...@@ -2584,7 +2580,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2584,7 +2580,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid); tscInsertPrimaryTsSourceColumn(pQueryInfo, pTableMetaInfo->pTableMeta->id.uid);
colIndex += 1; // the first column is ts colIndex += 1; // the first column is ts
pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false); pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), interResult, false);
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
} else { } else {
tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true); tVariantDump(pVariant, val, TSDB_DATA_TYPE_BIGINT, true);
...@@ -2619,7 +2615,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col ...@@ -2619,7 +2615,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
if (finalResult) { if (finalResult) {
insertResultField(pQueryInfo, colIndex, &ids, resultSize, resultType, pExpr->base.aliasName, pExpr); insertResultField(pQueryInfo, colIndex, &ids, resultSize, (int8_t)resultType, pExpr->base.aliasName, pExpr);
} else { } else {
assert(ids.num == 1); assert(ids.num == 1);
tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema); tscColumnListInsert(pQueryInfo->colList, ids.ids[0].columnIndex, pExpr->base.uid, pSchema);
...@@ -7786,8 +7782,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7786,8 +7782,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg3 = "start(end) time of query range required or time range too large"; const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column"; const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg5 = "only tag query not compatible with normal column filter"; const char* msg5 = "only tag query not compatible with normal column filter";
const char* msg6 = "not support stddev/percentile in outer query yet"; const char* msg6 = "not support stddev/percentile/interp in the outer query yet";
const char* msg7 = "drivative requires timestamp column exists in subquery"; const char* msg7 = "derivative/twa/irate requires timestamp column exists in subquery";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -7830,15 +7826,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7830,15 +7826,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) { if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// parse the window_state // parse the window_state
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, false) != TSDB_CODE_SUCCESS) { if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, false) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
// todo NOT support yet // todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) { for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i); SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId; int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) { if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
} }
...@@ -7853,9 +7851,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -7853,9 +7851,17 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0); SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
if (tscNumOfExprs(pQueryInfo) > 1) { int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
if (numOfExprs == 1) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
} else {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 1); SExprInfo* pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.functionId == TSDB_FUNC_DERIVATIVE && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) { int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
} }
} }
......
...@@ -286,7 +286,7 @@ enum OPERATOR_TYPE_E { ...@@ -286,7 +286,7 @@ enum OPERATOR_TYPE_E {
OP_TagScan = 4, OP_TagScan = 4,
OP_TableBlockInfoScan= 5, OP_TableBlockInfoScan= 5,
OP_Aggregate = 6, OP_Aggregate = 6,
OP_Arithmetic = 7, OP_Project = 7,
OP_Groupby = 8, OP_Groupby = 8,
OP_Limit = 9, OP_Limit = 9,
OP_SLimit = 10, OP_SLimit = 10,
...@@ -414,13 +414,13 @@ typedef struct SAggOperatorInfo { ...@@ -414,13 +414,13 @@ typedef struct SAggOperatorInfo {
uint32_t seed; uint32_t seed;
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SArithOperatorInfo { typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
int32_t bufCapacity; int32_t bufCapacity;
uint32_t seed; uint32_t seed;
SSDataBlock *existDataBlock; SSDataBlock *existDataBlock;
} SArithOperatorInfo; } SProjectOperatorInfo;
typedef struct SLimitOperatorInfo { typedef struct SLimitOperatorInfo {
int64_t limit; int64_t limit;
...@@ -514,7 +514,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* ...@@ -514,7 +514,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream); SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream);
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......
...@@ -74,7 +74,6 @@ ...@@ -74,7 +74,6 @@
} while (0); } while (0);
void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {}
void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {}
void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); } void doFinalizer(SQLFunctionCtx *pCtx) { RESET_RESULT_INFO(GET_RES_INFO(pCtx)); }
......
...@@ -184,7 +184,7 @@ static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr); ...@@ -184,7 +184,7 @@ static int32_t getNumOfScanTimes(SQueryAttr* pQueryAttr);
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput); static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput); static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput); static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput); static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput); static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
...@@ -912,7 +912,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo ...@@ -912,7 +912,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
} }
} else { } else {
if (/*pCtx[0].pInput == NULL && */pBlock->pDataBlock != NULL) { if (pBlock->pDataBlock != NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order); doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else { } else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order); doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
...@@ -978,7 +978,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction ...@@ -978,7 +978,7 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SQLFunction
} }
} }
static void arithmeticApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) { static void projectApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t numOfOutput) {
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
...@@ -1282,11 +1282,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1282,11 +1282,8 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
return; return;
} }
int64_t* tsList = NULL;
SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pFirstColData = taosArrayGet(pSDataBlock->pDataBlock, 0);
if (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP) { int64_t* tsList = (pFirstColData->info.type == TSDB_DATA_TYPE_TIMESTAMP)? (int64_t*) pFirstColData->pData:NULL;
tsList = (int64_t*) pFirstColData->pData;
}
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
...@@ -1319,12 +1316,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1319,12 +1316,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
} }
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, pInfo->prevData, bytes);
bytes);
} }
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, pInfo->prevData, type, bytes, item->groupIndex);
item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
...@@ -1340,17 +1335,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn ...@@ -1340,17 +1335,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
memcpy(pInfo->prevData, val, bytes); memcpy(pInfo->prevData, val, bytes);
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) { if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
bytes);
} }
int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, int32_t ret = setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
} }
doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->binfo.pCtx, &w, pSDataBlock->info.rows - num, num, tsList, pSDataBlock->info.rows, pOperator->numOfOutput);
tfree(pInfo->prevData);
} }
} }
...@@ -1806,17 +1800,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -1806,17 +1800,17 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break; break;
} }
case OP_Arithmetic: { // TODO refactor to remove arith operator. case OP_Project: { // TODO refactor to remove arith operator.
SOperatorInfo* prev = pRuntimeEnv->proot; SOperatorInfo* prev = pRuntimeEnv->proot;
if (i == 0) { if (i == 0) {
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor if (pRuntimeEnv->proot != NULL && prev->operatorType != OP_DummyInput && prev->operatorType != OP_Join) { // TODO refactor
setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot); setTableScanFilterOperatorInfo(prev->info, pRuntimeEnv->proot);
} }
} else { } else {
prev = pRuntimeEnv->proot; prev = pRuntimeEnv->proot;
assert(pQueryAttr->pExpr2 != NULL); assert(pQueryAttr->pExpr2 != NULL);
pRuntimeEnv->proot = createArithOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2); pRuntimeEnv->proot = createProjectOperatorInfo(pRuntimeEnv, prev, pQueryAttr->pExpr2, pQueryAttr->numOfExpr2);
} }
break; break;
} }
...@@ -4578,8 +4572,8 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf ...@@ -4578,8 +4572,8 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->resultRowInfo;
pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset; pTableScanInfo->rowCellInfoOffset = pInfo->rowCellInfoOffset;
} else if (pDownstream->operatorType == OP_Arithmetic) { } else if (pDownstream->operatorType == OP_Project) {
SArithOperatorInfo *pInfo = pDownstream->info; SProjectOperatorInfo *pInfo = pDownstream->info;
pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pCtx = pInfo->binfo.pCtx;
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
...@@ -4934,23 +4928,23 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) { ...@@ -4934,23 +4928,23 @@ static SSDataBlock* doSTableAggregate(void* param, bool* newgroup) {
return pInfo->pRes; return pInfo->pRes;
} }
static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { static SSDataBlock* doProjectOperation(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
SArithOperatorInfo* pArithInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
SOptrBasicInfo *pInfo = &pArithInfo->binfo; SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
int32_t order = pRuntimeEnv->pQueryAttr->order.order; int32_t order = pRuntimeEnv->pQueryAttr->order.order;
pRes->info.rows = 0; pRes->info.rows = 0;
if (pArithInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pArithInfo->existDataBlock; SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pArithInfo->existDataBlock = NULL; pProjectInfo->existDataBlock = NULL;
*newgroup = true; *newgroup = true;
// todo dynamic set tags // todo dynamic set tags
...@@ -4960,9 +4954,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4960,9 +4954,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
} }
...@@ -4990,7 +4984,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -4990,7 +4984,7 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// Return result of the previous group in the firstly. // Return result of the previous group in the firstly.
if (*newgroup) { if (*newgroup) {
if (pRes->info.rows > 0) { if (pRes->info.rows > 0) {
pArithInfo->existDataBlock = pBlock; pProjectInfo->existDataBlock = pBlock;
clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput); clearNumOfRes(pInfo->pCtx, pOperator->numOfOutput);
return pInfo->pRes; return pInfo->pRes;
} else { // init output buffer for a new group data } else { // init output buffer for a new group data
...@@ -5010,9 +5004,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) { ...@@ -5010,9 +5004,9 @@ static SSDataBlock* doArithmeticOperation(void* param, bool* newgroup) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order);
updateOutputBuf(&pArithInfo->binfo, &pArithInfo->bufCapacity, pBlock->info.rows); updateOutputBuf(&pProjectInfo->binfo, &pProjectInfo->bufCapacity, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput);
if (pTableQueryInfo != NULL) { if (pTableQueryInfo != NULL) {
updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order); updateTableIdInfo(pTableQueryInfo, pBlock, pRuntimeEnv->pTableRetrieveTsMap, order);
} }
...@@ -5649,8 +5643,8 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5649,8 +5643,8 @@ static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
tfree(pInfo->prevData); tfree(pInfo->prevData);
} }
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput) { static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = (SArithOperatorInfo*) param; SProjectOperatorInfo* pInfo = (SProjectOperatorInfo*) param;
doDestroyBasicInfo(&pInfo->binfo, numOfOutput); doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
} }
...@@ -5696,8 +5690,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO ...@@ -5696,8 +5690,8 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
return pOperator; return pOperator;
} }
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo)); SProjectOperatorInfo* pInfo = calloc(1, sizeof(SProjectOperatorInfo));
pInfo->seed = rand(); pInfo->seed = rand();
pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity; pInfo->bufCapacity = pRuntimeEnv->resultInfo.capacity;
...@@ -5710,8 +5704,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5710,8 +5704,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "ArithmeticOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = OP_Arithmetic; pOperator->operatorType = OP_Project;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
...@@ -5719,8 +5713,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -5719,8 +5713,8 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->exec = doArithmeticOperation; pOperator->exec = doProjectOperation;
pOperator->cleanup = destroyArithOperatorInfo; pOperator->cleanup = destroyProjectOperatorInfo;
appendUpstream(pOperator, upstream); appendUpstream(pOperator, upstream);
return pOperator; return pOperator;
......
...@@ -565,7 +565,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -565,7 +565,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
...@@ -585,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -585,7 +585,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
} }
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} else if (pQueryAttr->sw.gap > 0) { } else if (pQueryAttr->sw.gap > 0) {
...@@ -593,7 +593,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -593,7 +593,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} else if (pQueryAttr->stateWindow) { } else if (pQueryAttr->stateWindow) {
...@@ -601,7 +601,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -601,7 +601,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} else if (pQueryAttr->simpleAgg) { } else if (pQueryAttr->simpleAgg) {
...@@ -619,7 +619,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -619,7 +619,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
} }
if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) { if (pQueryAttr->pExpr2 != NULL && !pQueryAttr->stableQuery) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} else { // diff/add/multiply/subtract/division } else { // diff/add/multiply/subtract/division
...@@ -627,7 +627,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) { ...@@ -627,7 +627,7 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
op = OP_Filter; op = OP_Filter;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} else { } else {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} }
...@@ -665,7 +665,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) { ...@@ -665,7 +665,7 @@ SArray* createGlobalMergePlan(SQueryAttr* pQueryAttr) {
} }
if (pQueryAttr->pExpr2 != NULL) { if (pQueryAttr->pExpr2 != NULL) {
op = OP_Arithmetic; op = OP_Project;
taosArrayPush(plan, &op); taosArrayPush(plan, &op);
} }
} }
......
...@@ -741,4 +741,44 @@ if $data14 != 2 then ...@@ -741,4 +741,44 @@ if $data14 != 2 then
return -1 return -1
endi endi
sql create table m1 (ts timestamp, k int, f1 int) tags(a int);
sql create table tm0 using m1 tags(0);
sql create table tm1 using m1 tags(1);
sql insert into tm0 values('2020-1-1 1:1:1', 1, 10);
sql insert into tm0 values('2020-1-1 1:1:2', 1, 20);
sql insert into tm1 values('2020-2-1 1:1:1', 2, 10);
sql insert into tm1 values('2020-2-1 1:1:2', 2, 20);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 100
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
sleep 100
sql use group_db0;
print =========================>TD-4894
sql select count(*),k from m1 group by k;
if $rows != 2 then
return -1
endi
if $data00 != 2 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
if $data11 != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -180,20 +180,82 @@ if $data21 != 49.500000000 then ...@@ -180,20 +180,82 @@ if $data21 != 49.500000000 then
endi endi
#define TSDB_FUNC_APERCT 7 #define TSDB_FUNC_APERCT 7
#define TSDB_FUNC_LAST_ROW 10
#define TSDB_FUNC_TWA 14 #define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15 #define TSDB_FUNC_LEASTSQR 15
#define TSDB_FUNC_ARITHM 23
#define TSDB_FUNC_DIFF 24 #define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28 #define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_RATE 29
#define TSDB_FUNC_IRATE 30 #define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32 #define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0); sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0); sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql_error select interp(c1) from (select * from nest_tb0);
sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
sql_error select twa(c1) from (select c1 from nest_tb0);
sql_error select irate(c1) from (select c1 from nest_tb0);
sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
sql select twa(c1) from (select * from nest_tb0);
sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
sql select irate(c1) from (select * from nest_tb0);
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d); sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
if $rows != 7 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 48.666666667 then
print expect 48.666666667, actual: $data01
return -1
endi
if $data02 != 70080.000000000 then
return -1
endi
if $data03 != 99 then
return -1
endi
if $data04 != 0 then
return -1
endi
if $data05 != 1440 then
return -1
endi
if $data06 != 0 then
print $data06
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != 99.000000000 then
print expect 99.000000000, actual: $data08
return -1
endi
if $data10 != @20-09-16 00:00:00.000@ then
return -1
endi
if $data11 != 49.777777778 then
return -1
endi
if $data12 != 71680.000000000 then
return -1
endi
sql select top(x, 20) from (select c1 x from nest_tb0); sql select top(x, 20) from (select c1 x from nest_tb0);
...@@ -207,6 +269,9 @@ print ===================> group by + having ...@@ -207,6 +269,9 @@ print ===================> group by + having
print =========================> ascending order/descending order
print =========================> nest query join print =========================> nest query join
...@@ -273,7 +338,6 @@ if $data03 != @20-09-15 00:00:00.000@ then ...@@ -273,7 +338,6 @@ if $data03 != @20-09-15 00:00:00.000@ then
return -1 return -1
endi endi
sql_error select derivative(val, 1s, 0) from (select c1 val from nest_tb0);
sql select diff(val) from (select c1 val from nest_tb0); sql select diff(val) from (select c1 val from nest_tb0);
if $rows != 9999 then if $rows != 9999 then
return -1 return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册