From 2891d51a3b49167a64a159e28b5de38e499bad7e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 Mar 2021 17:41:36 +0800 Subject: [PATCH] [td-2895] fix bug found by regression test. --- src/client/src/tscSQLParser.c | 42 +++++--- src/query/src/qExecutor.c | 115 ++++++++++++++-------- tests/script/general/parser/testSuite.sim | 3 + 3 files changed, 101 insertions(+), 59 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 5744d2e3f9..8824bed1e9 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1536,6 +1536,7 @@ bool isValidDistinctSql(SQueryInfo* pQueryInfo) { } return false; } + int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) { assert(pSelection != NULL && pCmd != NULL); @@ -1871,6 +1872,26 @@ void setResultColName(char* name, tSqlExprItem* pItem, int32_t functionId, SStrT } } +static void updateLastQueryInfoForGroupby(SQueryInfo* pQueryInfo, STableMeta* pTableMeta, int32_t functionId, int32_t index) { + if (functionId != TSDB_FUNC_LAST) { // todo refactor + return; + } + + SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr; + if (pGroupBy->numOfGroupCols > 0) { + for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) { + SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k); + if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMeta)) { // group by normal columns + SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, index); + pExpr->numOfParams = 1; + pExpr->param->i64 = TSDB_ORDER_ASC; + + return; + } + } + } +} + int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult) { STableMetaInfo* pTableMetaInfo = NULL; int32_t functionId = pItem->pNode->functionId; @@ -2135,6 +2156,8 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col if (setExprInfoForFunctions(pCmd, pQueryInfo, &pSchema[j], cvtFunc, name, colIndex++, &index, finalResult) != 0) { return TSDB_CODE_TSC_INVALID_SQL; } + + updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex - 1); } } else { @@ -2150,7 +2173,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } char name[TSDB_COL_NAME_LEN] = {0}; - SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); bool multiColOutput = pItem->pNode->pParam->nExpr > 1; @@ -2160,21 +2182,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_SQL; } - if (functionId == TSDB_FUNC_LAST) { // todo refactor - SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr; - if (pGroupBy->numOfGroupCols > 0) { - for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) { - SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k); - if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { // group by normal columns - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, colIndex + i); - pExpr->numOfParams = 1; - pExpr->param->i64 = TSDB_ORDER_ASC; - - break; - } - } - } - } + updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex + i); } } @@ -2202,6 +2210,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_TSC_INVALID_SQL; } + updateLastQueryInfoForGroupby(pQueryInfo, pTableMetaInfo->pTableMeta, functionId, colIndex); colIndex++; } @@ -2211,6 +2220,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_SUCCESS; } } + case TSDB_FUNC_TOP: case TSDB_FUNC_BOTTOM: case TSDB_FUNC_PERCT: diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index db6a631134..e181b39967 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -35,6 +35,13 @@ #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) +#define CHECK_IF_QUERY_KILLED(_q) \ + do { \ + if (isQueryKilled((_q)->qinfo)) { \ + longjmp((_q)->env, TSDB_CODE_TSC_QUERY_CANCELLED); \ + } \ + } while (0) + #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define TIME_WINDOW_COPY(_dst, _src) do {\ @@ -189,6 +196,9 @@ static bool isPointInterpoQuery(SQuery *pQuery); static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo); static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); +static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, + SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, + int32_t groupIndex); // setup the output buffer for each operator static SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { @@ -378,6 +388,32 @@ static bool hasNullRv(SColIndex* pColIndex, SDataStatis *pStatis) { return true; } +static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntimeEnv* pRuntimeEnv) { + // more than the capacity, reallocate the resources + if (pResultRowInfo->size < pResultRowInfo->capacity) { + return; + } + + int64_t newCapacity = 0; + if (pResultRowInfo->capacity > 10000) { + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); + } else { + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); + } + + char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); + if (t == NULL) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pResultRowInfo->pResult = (SResultRow **)t; + + int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; + memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc); + + pResultRowInfo->capacity = (int32_t)newCapacity; +} + static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { bool existed = false; @@ -408,28 +444,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes } if (!existed) { - // TODO refactor - // more than the capacity, reallocate the resources - if (pResultRowInfo->size >= pResultRowInfo->capacity) { - int64_t newCapacity = 0; - if (pResultRowInfo->capacity > 10000) { - newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); - } else { - newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); - } - - char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); - if (t == NULL) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - pResultRowInfo->pResult = (SResultRow **)t; - - int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; - memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc); - - pResultRowInfo->capacity = (int32_t)newCapacity; - } + prepareResultListBuffer(pResultRowInfo, pRuntimeEnv); SResultRow *pResult = NULL; @@ -2486,6 +2501,10 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + } else if (pQuery->stableQuery) { // stable aggregate, not interval aggregate or normal column aggregate + doSetTableGroupOutputBuf(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pTableScanInfo->pCtx, + pTableScanInfo->rowCellInfoOffset, pTableScanInfo->numOfOutput, + pQuery->current->groupIndex); } (*status) = doFilterByBlockTimeWindow(pTableScanInfo, pBlock); @@ -3101,7 +3120,9 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe int16_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); - if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) { + + SResultRowCellInfo* pResInfo = pCtx[i].resultInfo; + if (pResInfo->initialized && pResInfo->complete) { offset += pCtx[i].outputBytes; continue; } @@ -3114,25 +3135,17 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe pCtx[i].ptsOutputBuf = pCtx[0].pOutput; } - if (!pCtx[i].resultInfo->initialized) { + if (!pResInfo->initialized) { aAggs[functionId].init(&pCtx[i]); } } } -void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, - TSKEY nextKey) { - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; - - // lastKey needs to be updated - pTableQueryInfo->lastKey = nextKey; - if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { - return; - } - +void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SQLFunctionCtx* pCtx, + int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t groupIndex) { int64_t uid = 0; SResultRow* pResultRow = - doPrepareResultRowFromKey(pRuntimeEnv, &pInfo->resultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid); + doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&groupIndex, sizeof(groupIndex), true, uid); assert (pResultRow != NULL); /* @@ -3140,16 +3153,29 @@ void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, i * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - if (addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize) != - TSDB_CODE_SUCCESS) { + int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, groupIndex, pRuntimeEnv->pQuery->resultRowSize); + if (ret != TSDB_CODE_SUCCESS) { return; } } + setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); +} + +void setExecutionContext(SQueryRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t groupIndex, + TSKEY nextKey) { + STableQueryInfo *pTableQueryInfo = pRuntimeEnv->pQuery->current; + + // lastKey needs to be updated + pTableQueryInfo->lastKey = nextKey; + if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == groupIndex) { + return; + } + + doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, groupIndex); + // record the current active group id pRuntimeEnv->prevGroupId = groupIndex; - setResultOutputBuf(pRuntimeEnv, pResultRow, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); - initCtxOutputBuffer(pInfo->pCtx, numOfOutput); } void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLFunctionCtx* pCtx, @@ -3981,11 +4007,14 @@ static SSDataBlock* doTableScanImpl(void* param) { STableGroupInfo* pTableGroupInfo = &pOperator->pRuntimeEnv->tableqinfoGroupInfo; while (tsdbNextDataBlock(pTableScanInfo->pQueryHandle)) { - pTableScanInfo->numOfBlocks += 1; + if (isQueryKilled(pOperator->pRuntimeEnv->qinfo)) { + longjmp(pOperator->pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } - // todo check for query cancel + pTableScanInfo->numOfBlocks += 1; tsdbRetrieveDataBlockInfo(pTableScanInfo->pQueryHandle, &pBlock->info); + // todo opt if (pTableGroupInfo->numOfTables > 1 || (pQuery->current == NULL && pTableGroupInfo->numOfTables == 1)) { STableQueryInfo** pTableQueryInfo = (STableQueryInfo**)taosHashGet(pTableGroupInfo->map, &pBlock->info.tid, sizeof(pBlock->info.tid)); @@ -4349,8 +4378,8 @@ static SSDataBlock* doSTableAggregate(void* param) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); - TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; - setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, k); + TSKEY key = QUERY_IS_ASC_QUERY(pQuery)? pBlock->info.window.ekey + 1:pBlock->info.window.skey-1; + setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pQuery->current->groupIndex, key); doAggregateImpl(pOperator, pQuery->window.skey, pInfo->pCtx, pBlock); } diff --git a/tests/script/general/parser/testSuite.sim b/tests/script/general/parser/testSuite.sim index 255389a2df..c2ce5df12e 100644 --- a/tests/script/general/parser/testSuite.sim +++ b/tests/script/general/parser/testSuite.sim @@ -107,3 +107,6 @@ sleep 100 run general/parser/function.sim sleep 100 run general/parser/stableOp.sim +sleep 100 +run general/parser/slimit_alter_tags.sim + -- GitLab