From f5eefb0f26343fc8353468e577c20582507694b4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 23 Aug 2021 17:00:40 +0800 Subject: [PATCH] [td-6260]: Optimize the client-side query performance when multiple group result exists. --- src/client/inc/tscUtil.h | 2 +- src/client/src/tscGlobalmerge.c | 368 ++++++++++++++------------------ src/client/src/tscSQLParser.c | 1 - src/query/inc/qExecutor.h | 15 +- src/query/src/qExecutor.c | 210 +++++++++++------- src/query/src/qFill.c | 2 +- 6 files changed, 311 insertions(+), 287 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index e11748efbe..3e6ab05d67 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -36,7 +36,7 @@ extern "C" { (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ - (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) + (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo))) #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) diff --git a/src/client/src/tscGlobalmerge.c b/src/client/src/tscGlobalmerge.c index 2b635e8d83..14ab595c8d 100644 --- a/src/client/src/tscGlobalmerge.c +++ b/src/client/src/tscGlobalmerge.c @@ -35,6 +35,7 @@ typedef struct SCompareParam { static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { int32_t ret = 0; + size_t size = taosArrayGetSize(columnIndexList); if (size > 0) { ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC); @@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc (*hasPrev) = true; } +// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the +// output value to multiple rows static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { if (numOfRows <= 1) { - return ; + return; } for (int32_t k = 0; k < numOfOutput; ++k) { @@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput continue; } - int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result - char* src = pCtx[k].pOutput; + char* src = pCtx[k].pOutput; + char* dst = pCtx[k].pOutput + pCtx[k].outputBytes; - for (int32_t i = 0; i < inc; ++i) { - pCtx[k].pOutput += pCtx[k].outputBytes; - memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); + // Let's start from the second row, as the first row has result value already. + for (int32_t i = 1; i < numOfRows; ++i) { + memcpy(dst, src, (size_t)pCtx[k].outputBytes); + dst += pCtx[k].outputBytes; + } + } +} + +static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) { + for (int32_t j = 0; j < numOfExpr; ++j) { + pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex; + } + + for (int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); + } else { + aAggs[functionId].mergeFunc(&pCtx[j]); + } + } +} + +static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) { + for(int32_t j = 0; j < numOfExpr; ++j) { + int32_t functionId = pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + if (functionId < 0) { + SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); + doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); + } else { + aAggs[functionId].xFinalize(&pCtx[j]); } } } @@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD SMultiwayMergeInfo* pInfo = pOperator->info; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; - char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); + char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES); for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - add[i] = pCtx[i].pInput; + addrPtr[i] = pCtx[i].pInput; pCtx[i].size = 1; } for(int32_t i = 0; i < pBlock->info.rows; ++i) { if (pInfo->hasPrev) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } else { - for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pCtx[j]); - } + doFinalizeResultImpl(pInfo, pCtx, numOfExpr); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); @@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); } - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } } else { - for (int32_t j = 0; j < numOfExpr; ++j) { - pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - - continue; - } - - aAggs[functionId].mergeFunc(&pCtx[j]); - } + doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr); } savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); @@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - pCtx[i].pInput = add[i]; + pCtx[i].pInput = addrPtr[i]; } } - tfree(add); + tfree(addrPtr); } static bool isAllSourcesCompleted(SGlobalMerger *pMerger) { @@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; bool sameGroup = true; if (pInfo->hasPrev) { + + // todo refactor extract method int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); // if this row belongs to current result set group @@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { break; } + bool sameGroup = true; if (pAggInfo->hasGroupColData) { - bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); - if (!sameGroup) { + sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); + if (!sameGroup && !pAggInfo->multiGroupResults) { *newgroup = true; pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; @@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { } if (handleData) { // data in current group is all handled - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { - int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; - } - - if (functionId < 0) { - SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1); - - doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - - continue; - } - - aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); - } + doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); - pAggInfo->binfo.pRes->info.rows += numOfRows; + pAggInfo->binfo.pRes->info.rows += numOfRows; setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); } @@ -1019,137 +975,143 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { return (pRes->info.rows != 0)? pRes:NULL; } -static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { - SSLimitOperatorInfo *pInfo = pOperator->info; - assert(pInfo->currentGroupOffset >= 0); +static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) { + if (pInfo->currentOffset > 0) { + pInfo->currentOffset -= 1; + } else { + // discard the data rows in current group + if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) { + int32_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock); + for (int32_t i = 0; i < num1; ++i) { + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i); - SSDataBlock* pBlock = NULL; - if (pInfo->currentGroupOffset == 0) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - } + SColumnInfo *pColInfo = &pColInfoData->info; + + char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes); - if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { - while ((*newgroup) == false) { // ignore the remain blocks - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + memcpy(pDst, pSrc, pColInfo->bytes); } - } - - return pBlock; - } - - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; + pInfo->rowsTotal += 1; + pInfo->pRes->info.rows += 1; } + } +} - while(1) { - if (*newgroup) { - pInfo->currentGroupOffset -= 1; - *newgroup = false; - } +static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) { + if (pInfo->capacity < pResultBlock->info.rows + numOfRows) { + int32_t total = pResultBlock->info.rows + numOfRows; - while ((*newgroup) == false) { - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); - publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + int32_t num = taosArrayGetSize(pResultBlock->pDataBlock); + for (int32_t i = 0; i < num; ++i) { + SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes); + if (tmp != NULL) { + pInfoData->pData = tmp; + } else { + // todo handle the malloc failure } - // now we have got the first data block of the next group. - if (pInfo->currentGroupOffset == 0) { - return pBlock; - } + pInfo->capacity = total; + pInfo->threshold = total * 0.8; } - - return NULL; + } } -SSDataBlock* doSLimit(void* param, bool* newgroup) { - SOperatorInfo *pOperator = (SOperatorInfo *)param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } +static void doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) { + int32_t rowIndex = 0; - SSLimitOperatorInfo *pInfo = pOperator->info; + while (rowIndex < pBlock->info.rows) { + int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); - SSDataBlock *pBlock = NULL; - while (1) { - pBlock = skipGroupBlock(pOperator, newgroup); - if (pBlock == NULL) { - setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - pOperator->status = OP_EXEC_DONE; - return NULL; - } + bool samegroup = true; + if (pInfo->hasPrev) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex * pIndex = taosArrayGet(pInfo->orderColumnList, i); + SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); - if (*newgroup) { // a new group arrives - pInfo->groupTotal += 1; - pInfo->rowsTotal = 0; - pInfo->currentOffset = pInfo->limit.offset; + SColumnInfo *pColInfo = &pColInfoData->info; + + char * d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData; + int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes); + if (ret != 0) { // it is a new group + samegroup = false; + break; + } + } } - assert(pInfo->currentGroupOffset == 0); + if (!samegroup || !pInfo->hasPrev) { + pInfo->ignoreCurrentGroup = false; + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev); - if (pInfo->currentOffset >= pBlock->info.rows) { - pInfo->currentOffset -= pBlock->info.rows; - } else { - if (pInfo->currentOffset == 0) { - break; + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group + pInfo->rowsTotal = 0; + + if (pInfo->currentGroupOffset > 0) { + pInfo->ignoreCurrentGroup = true; + pInfo->currentGroupOffset -= 1; // now we are in the next group data + rowIndex += 1; + continue; } - int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); - pBlock->info.rows = remain; + // A new group has arrived according to the result rows, and the group limitation has already reached. + // Let's jump out of current loop and return immediately. + if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) { + setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); + pOperator->status = OP_EXEC_DONE; + return; + } - // move the remain rows of this data block to the front. - for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + pInfo->groupTotal += 1; + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); - int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); + } else { // handle the offset in the same group + // All the data in current group needs to be discarded, due to the limit parameter in the SQL statement + if (pInfo->ignoreCurrentGroup) { + rowIndex += 1; + continue; } - pInfo->currentOffset = 0; - break; + doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex); } + + rowIndex += 1; } +} - if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort +SSDataBlock* doSLimit(void* param, bool* newgroup) { + SOperatorInfo *pOperator = (SOperatorInfo *)param; + if (pOperator->status == OP_EXEC_DONE) { return NULL; } - if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { - pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); - pInfo->rowsTotal = pInfo->limit.limit; + SSLimitOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; + + assert(pInfo->currentGroupOffset >= 0); + + while(1) { + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); + publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) { - pOperator->status = OP_EXEC_DONE; + if (pBlock == NULL) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - // setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); - } else { - pInfo->rowsTotal += pBlock->info.rows; - } + ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows); + doSlimitImpl(pOperator, pInfo, pBlock); + if (pOperator->status == OP_EXEC_DONE) { + return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; + } - return pBlock; + // now the number of rows in current group is enough, let's return to the invoke function + if (pInfo->pRes->info.rows > pInfo->threshold) { + return pInfo->pRes; + } + } } diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 7a24df8b40..c827ae96d5 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6917,7 +6917,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo const char* msg1 = "interval not allowed in group by normal column"; STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* tagSchema = NULL; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 2b9f952d76..b06b73e13f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -470,6 +470,11 @@ typedef struct SSLimitOperatorInfo { char **prevRow; SArray *orderColumnList; + bool hasPrev; + bool ignoreCurrentGroup; + SSDataBlock *pRes; // result buffer + int64_t capacity; + int64_t threshold; } SSLimitOperatorInfo; typedef struct SFilterOperatorInfo { @@ -481,7 +486,7 @@ typedef struct SFillOperatorInfo { SFillInfo *pFillInfo; SSDataBlock *pRes; int64_t totalInputRows; - + void **p; SSDataBlock *existNewGroupBlock; } SFillOperatorInfo; @@ -544,9 +549,9 @@ typedef struct SMultiwayMergeInfo { bool hasDataBlockForNewGroup; SSDataBlock *pExistBlock; - bool hasPrev; - bool groupMix; SArray *udfInfo; + bool hasPrev; + bool multiGroupResults; } SMultiwayMergeInfo; // todo support the disk-based sort @@ -577,8 +582,8 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, - int32_t numOfRows, void* merger, bool groupMix); -SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); + int32_t numOfRows, void* merger); +SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 26ae714c27..8bf8261160 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -39,15 +39,12 @@ #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) - #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define MULTI_KEY_DELIM "-" -#define HASH_CAPACITY_LIMIT 10000000 - #define TIME_WINDOW_COPY(_dst, _src) do {\ (_dst).skey = (_src).skey;\ (_dst).ekey = (_src).ekey;\ @@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t break; } } - - return; } static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, @@ -1338,7 +1333,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, } else { pCtx[k].start.ptr = (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes; } - + pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes; } } @@ -1627,7 +1622,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); preWin = win; - + int32_t prevEndPos = (forwardStep - 1) * step + startPos; startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); if (startPos < 0) { @@ -2271,25 +2266,21 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_MultiwayMergeSort: { - bool groupMix = true; - if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) { - groupMix = false; - } - - pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, - 4096, merger, groupMix); // TODO hack it + pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger); break; } - case OP_GlobalAggregate: { + case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock. + bool groupResultMixedUp = (pQueryAttr->fillType == TSDB_FILL_NONE); pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); + pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, groupResultMixedUp); break; } case OP_SLimit: { - pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, - pQueryAttr->numOfExpr3, merger); + int32_t num = pRuntimeEnv->proot->numOfOutput; + SExprInfo* pExpr = pRuntimeEnv->proot->pExpr; + pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger); break; } @@ -3648,7 +3639,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; - // re-estabilish output buffer pointer. + // set the correct pointer after the memory buffer reallocated. int32_t functionId = pBInfo->pCtx[i].functionId; if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; @@ -4213,6 +4204,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti // refactor : extract method SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); + //add condition (pBlock->info.rows >= 1) just to runtime happy if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { STimeWindow* w = &pBlock->info.window; @@ -4292,15 +4284,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data } } -int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { - void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES); +int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); - p[i] = pColInfoData->pData; + p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows); } - pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); - tfree(p); + int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows); + pOutput->info.rows += numOfRows; + return pOutput->info.rows; } @@ -5344,11 +5336,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; taosArrayDestroy(pInfo->orderColumnList); + pInfo->pRes = destroyOutputBuf(pInfo->pRes); tfree(pInfo->prevRow); } SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, - SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { + SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); pInfo->resultRowFactor = @@ -5356,15 +5349,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx - pInfo->pMerge = param; - pInfo->bufCapacity = 4096; - pInfo->udfInfo = pUdfInfo; - - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); - pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - - pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->multiGroupResults = groupResultMixedUp; + pInfo->pMerge = param; + pInfo->bufCapacity = 4096; + pInfo->udfInfo = pUdfInfo; + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); + pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); + pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); + pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); // TODO refactor int32_t len = 0; @@ -5417,17 +5409,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, - int32_t numOfRows, void *merger, bool groupMix) { + int32_t numOfRows, void *merger) { SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); - pInfo->pMerge = merger; - pInfo->groupMix = groupMix; - pInfo->bufCapacity = numOfRows; - + pInfo->pMerge = merger; + pInfo->bufCapacity = numOfRows; pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); + pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); - { + { // todo extract method to create prev compare buffer int32_t len = 0; for(int32_t i = 0; i < numOfOutput; ++i) { len += pExpr[i].base.colBytes; @@ -5435,8 +5425,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -5452,7 +5442,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx pOperator->status = OP_IN_EXECUTING; pOperator->info = pInfo; pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; + pOperator->numOfOutput = numOfOutput; + pOperator->pExpr = pExpr; pOperator->exec = doMultiwayMergeSort; pOperator->cleanup = destroyGlobalAggOperatorInfo; return pOperator; @@ -6362,19 +6353,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { return pInfo->binfo.pRes; } -static SSDataBlock* doFill(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*) param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SFillOperatorInfo *pInfo = pOperator->info; - SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - +static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { *newgroup = false; - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return; + } } // handle the cached new group data block @@ -6386,11 +6371,47 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; - return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } +} + +static SSDataBlock* doFill(void* param, bool* newgroup) { + SOperatorInfo* pOperator = (SOperatorInfo*) param; + + SFillOperatorInfo *pInfo = pOperator->info; + pInfo->pRes->info.rows = 0; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { + return pInfo->pRes; + } +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } while(1) { publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -6418,28 +6439,60 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); } else { pInfo->totalInputRows += pBlock->info.rows; - - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - : */pBlock->info.window.ekey; - - taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey); + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); } } - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); - if (pInfo->pRes->info.rows > 0) { // current group has no more result to return - return pInfo->pRes; + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); + + // current group has no more result to return + if (pInfo->pRes->info.rows > 0) { + // the result in current group not reach the threshold of output result, continue + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + return pInfo->pRes; + } + + doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup); + if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) { + return pInfo->pRes; + } + +// if (taosFillHasMoreResults(pInfo->pFillInfo)) { +// *newgroup = false; +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); +// return pInfo->pRes; +// } +// +// // handle the cached new group data block +// if (pInfo->existNewGroupBlock) { +// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; +// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey; +// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); +// +// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); +// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); +// +// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); +// pInfo->existNewGroupBlock = NULL; +// *newgroup = true; +// +// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) { +// return pInfo->pRes; +// } +// +//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; +// } + } else if (pInfo->existNewGroupBlock) { // try next group pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; - int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey - :*/ pInfo->existNewGroupBlock->info.window.ekey; + int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); - doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); + doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p); pInfo->existNewGroupBlock = NULL; *newgroup = true; @@ -6447,7 +6500,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { } else { return NULL; } - // return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; } } @@ -6548,6 +6600,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = destroyOutputBuf(pInfo->pRes); + tfree(pInfo->p); } static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { @@ -6909,6 +6962,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); + + pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES); } SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -6936,9 +6991,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->slimit = pQueryAttr->slimit; pInfo->limit = pQueryAttr->limit; - + pInfo->capacity = pRuntimeEnv->resultInfo.capacity; + pInfo->threshold = pInfo->capacity * 0.8; + pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->currentGroupOffset = pQueryAttr->slimit.offset; - pInfo->currentOffset = pQueryAttr->limit.offset; // TODO refactor int32_t len = 0; @@ -6946,10 +7002,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator len += pExpr[i].base.resBytes; } - int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; + int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); - int32_t offset = POINTER_BYTES * numOfCols; + int32_t offset = POINTER_BYTES * numOfCols; for(int32_t i = 0; i < numOfCols; ++i) { pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; @@ -6957,6 +7013,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator offset += pExpr[index->colIndex].base.resBytes; } + pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); + SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SLimitOperator"; @@ -7195,8 +7253,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { pOperator->status = OP_EXEC_DONE; break; } - - // ensure result output buf + + // ensure result output buf if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { int32_t newSize = pRes->info.rows + pBlock->info.rows; for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { @@ -7219,7 +7277,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist + SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; memcpy(start, val, pDistDataInfo->bytes); @@ -7237,7 +7295,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); - + pInfo->totalBytes = 0; pInfo->buf = NULL; pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold diff --git a/src/query/src/qFill.c b/src/query/src/qFill.c index 1a86bbae36..cdcc164152 100644 --- a/src/query/src/qFill.c +++ b/src/query/src/qFill.c @@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); pFillInfo->pData[i] = pColData->pData; - if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer + if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; assert (pTag->col.colId == pCol->col.colId); memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? -- GitLab