From a0c216613a9b16a6a773da7846e3d4c0cbc9886e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Apr 2021 18:28:24 +0800 Subject: [PATCH] [td-225]fix bugs. --- src/client/src/tscLocalMerge.c | 135 +++++++++++++++++++-------------- src/query/inc/qExecutor.h | 3 +- src/query/src/qExecutor.c | 23 ++++-- 3 files changed, 94 insertions(+), 67 deletions(-) diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 4dc274fc5e..198bf797d5 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -1134,7 +1134,8 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc (*hasPrev) = true; } -static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, SSDataBlock* pBlock, bool needInit) { +static void doExecuteFinalMergeRv(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { + SMultiwayMergeInfo* pInfo = pOperator->info; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); @@ -1165,7 +1166,8 @@ static void doExecuteFinalMergeRv(SMultiwayMergeInfo* pInfo, int32_t numOfExpr, aAggs[functionId].xFinalize(&pCtx[j]); } - pInfo->binfo.pRes->info.rows += 1; + int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); + pInfo->binfo.pRes->info.rows += numOfRows; if (i == 0) { for(int32_t j = 0; j < numOfExpr; ++j) { @@ -1935,55 +1937,72 @@ SSDataBlock* doMultiwaySort(void* param) { return (pInfo->binfo.pRes->info.rows > 0)? pInfo->binfo.pRes:NULL; } +static bool isSameGroupRv(SArray* orderColumnList, SSDataBlock* pBlock, char** dataCols) { + int32_t numOfCols = (int32_t) taosArrayGetSize(orderColumnList); + for (int32_t i = 0; i < numOfCols; ++i) { + SColIndex *pIndex = taosArrayGet(orderColumnList, i); + + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); + assert(pIndex->colId == pColInfo->info.colId); + + char *data = dataCols[i]; + int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes); + if (ret == 0) { + continue; + } else { + return false; + } + } + + return true; +} + SSDataBlock* doGlobalAggregate(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { return NULL; } - SMultiwayMergeInfo* pAggInfo = pOperator->info; + SMultiwayMergeInfo *pAggInfo = pOperator->info; + SOperatorInfo *upstream = pOperator->upstream; -// SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv; - SOperatorInfo *upstream = pOperator->upstream; + bool handleData = false; + pAggInfo->binfo.pRes->info.rows = 0; { if (pAggInfo->hasDataBlockForNewGroup) { + pAggInfo->binfo.pRes->info.rows = 0; + pAggInfo->hasPrev = false; // now we start from a new group data set. + // not belongs to the same group, return the result of current group; setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pAggInfo->pExistBlock, TSDB_ORDER_ASC); updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pAggInfo->pExistBlock->info.rows); - doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pAggInfo->pExistBlock, false); + { // reset output buffer + for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + aAggs[pAggInfo->binfo.pCtx[j].functionId].init(&pAggInfo->binfo.pCtx[j]); + } + } + + doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pAggInfo->pExistBlock); - savePrevOrderColumns(pAggInfo->groupPrevRow, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0, - &pAggInfo->hasPrev); + savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pAggInfo->pExistBlock, 0, + &pAggInfo->hasGroupColData); pAggInfo->pExistBlock = NULL; pAggInfo->hasDataBlockForNewGroup = false; + handleData = true; } } + SSDataBlock* pBlock = NULL; while(1) { - SSDataBlock* pBlock = upstream->exec(upstream); + pBlock = upstream->exec(upstream); if (pBlock == NULL) { break; } - if (pAggInfo->hasPrev) { - bool sameGroup = true; - int32_t numOfCols = (int32_t) taosArrayGetSize(pAggInfo->groupColumnList); - for (int32_t i = 0; i < numOfCols; ++i) { - SColIndex *pIndex = taosArrayGet(pAggInfo->groupColumnList, i); - SColumnInfoData *pColInfo = taosArrayGet(pAggInfo->binfo.pRes->pDataBlock, pIndex->colIndex); - - char *data = pAggInfo->groupPrevRow[i]; - int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes); - if (ret == 0) { - continue; - } else { - sameGroup = false; - break; - } - } - + if (pAggInfo->hasGroupColData) { + bool sameGroup = isSameGroupRv(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); if (!sameGroup) { pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->pExistBlock = pBlock; @@ -1992,28 +2011,32 @@ SSDataBlock* doGlobalAggregate(void* param) { } } - // not belongs to the same group, return the result of current group; + // not belongs to the same group, return the result of current group setInputDataBlock(pOperator, pAggInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); + + // handle the output buffer problem updateOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows); - doExecuteFinalMergeRv(pAggInfo, pOperator->numOfOutput, pBlock, false); + doExecuteFinalMergeRv(pOperator, pOperator->numOfOutput, pBlock); + savePrevOrderColumns(pAggInfo->currentGroupColData, pAggInfo->groupColumnList, pBlock, 0, &pAggInfo->hasGroupColData); + handleData = true; } - for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { - int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; - if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { - continue; + if (handleData) { // data in current group is all handled + for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { + int32_t functionId = pAggInfo->binfo.pCtx[j].functionId; + if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { + continue; + } + + aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); } - aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); + int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); + pAggInfo->binfo.pRes->info.rows += numOfRows; } - pAggInfo->binfo.pRes->info.rows += 1; - -// pOperator->status = OP_EXEC_DONE; -// setQueryStatus(pRuntimeEnv, QUERY_COMPLETED); - - return pAggInfo->binfo.pRes; + return (pAggInfo->binfo.pRes->info.rows != 0)? pAggInfo->binfo.pRes:NULL; } SSDataBlock* doSLimit(void* param) { @@ -2033,6 +2056,16 @@ SSDataBlock* doSLimit(void* param) { return NULL; } + if (!pInfo->hasPrev) { + savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); + } else { + bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); + if (!sameGroup) { // reset info for new group data + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group + pInfo->rowsTotal = 0; + } + } + if (pInfo->currentGroupOffset == 0) { if (pInfo->currentOffset == 0) { // TODO refactor break; @@ -2056,28 +2089,14 @@ SSDataBlock* doSLimit(void* param) { } else { if (pInfo->hasPrev) { // Check if current data block belongs to current result group or not -// if (needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) { - bool sameGroup = true; - int32_t numOfCols = (int32_t) taosArrayGetSize(pInfo->orderColumnList); - for (int32_t i = 0; i < numOfCols; ++i) { - SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i); - SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex); - - char *data = pInfo->prevRow[i]; - int32_t ret = columnValueAscendingComparator(data, pColInfo->pData, pColInfo->info.type, pColInfo->info.bytes); - if (ret == 0) { - continue; - } else { - sameGroup = false; - break; - } - } - + bool sameGroup = isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow); if (sameGroup) { continue; // ignore the data block of the same group and try next } else { + //update the group column data by using the current group. savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, 0, &pInfo->hasPrev); - pInfo->currentOffset = pInfo->limit.offset; // set the offset value for a new group + + pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group pInfo->rowsTotal = 0; if ((--pInfo->currentGroupOffset) == 0) { @@ -2108,7 +2127,7 @@ SSDataBlock* doSLimit(void* param) { } } - if (!pInfo->hasPrev || !needToMergeRv(pBlock, pInfo->pMerger, 0, pInfo->prevRow)) { + if (!pInfo->hasPrev || !isSameGroupRv(pInfo->orderColumnList, pBlock, pInfo->prevRow)) { pInfo->groupTotal += 1; if (pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort return NULL; diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 320c00e9d5..6b968d8542 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -464,7 +464,8 @@ typedef struct SMultiwayMergeInfo { char **prevRow; SArray *orderColumnList; - char **groupPrevRow; + bool hasGroupColData; + char **currentGroupColData; SArray *groupColumnList; bool hasDataBlockForNewGroup; SSDataBlock *pExistBlock; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 09fcb303d2..0cb6580f05 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -2902,7 +2902,7 @@ int32_t initResultRow(SResultRow *pResultRow) { * +------------+-------------------------------------------+-------------------------------------------+ * offset[0] offset[1] */ -void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid) { +void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) { SQLFunctionCtx* pCtx = pInfo->pCtx; SSDataBlock* pDataBlock = pInfo->pRes; int32_t* rowCellInfoOffset = pInfo->rowCellInfoOffset; @@ -2921,8 +2921,9 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i SResultRowCellInfo* pCellInfo = getResultCell(pRow, i, rowCellInfoOffset); RESET_RESULT_INFO(pCellInfo); - pCtx[i].resultInfo = pCellInfo; - pCtx[i].pOutput = pData->pData; + pCtx[i].resultInfo = pCellInfo; + pCtx[i].pOutput = pData->pData; + pCtx[i].currentStage = stage; assert(pCtx[i].pOutput != NULL); // set the timestamp output buffer for top/bottom/diff query @@ -4315,6 +4316,8 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) { SArray* pOrderColumns = NULL; if (numOfCols > 0) { pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); + } else { + pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); } if (pQuery->interval.interval > 0) { @@ -4347,6 +4350,8 @@ SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) { SArray* pOrderColumns = NULL; if (numOfCols > 0) { pOrderColumns = taosArrayDup(pQuery->pGroupbyExpr->columnInfo); + } else { + pOrderColumns = taosArrayInit(4, sizeof(SColIndex)); } for(int32_t i = 0; i < numOfCols; ++i) { @@ -4369,6 +4374,8 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, // int32_t numOfRows = // (int32_t)(GET_ROW_PARAM_FOR_MULTIOUTPUT(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); + pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx + pInfo->pMerge = param; pInfo->bufCapacity = 4096; pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity); @@ -4395,11 +4402,11 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, } numOfCols = (pInfo->groupColumnList != NULL)? taosArrayGetSize(pInfo->groupColumnList):0; - pInfo->groupPrevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); + pInfo->currentGroupColData = calloc(1, (POINTER_BYTES * numOfCols + len)); offset = POINTER_BYTES * numOfOutput; for(int32_t i = 0; i < numOfCols; ++i) { - pInfo->groupPrevRow[i] = (char*)pInfo->groupPrevRow + offset; + pInfo->currentGroupColData[i] = (char*)pInfo->currentGroupColData + offset; SColIndex* index = taosArrayGet(pInfo->groupColumnList, i); offset += pExpr[index->colIndex].base.resBytes; @@ -4408,7 +4415,7 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed); + setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MERGE_STAGE); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "GlobalAggregate"; @@ -4971,7 +4978,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT); pInfo->seed = rand(); - setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed); + setDefaultOutputBuf(pRuntimeEnv, &pInfo->binfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "TableAggregate"; @@ -5063,7 +5070,7 @@ SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI pBInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pBInfo->rowCellInfoOffset); initResultRowInfo(&pBInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT); - setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed); + setDefaultOutputBuf(pRuntimeEnv, pBInfo, pInfo->seed, MASTER_SCAN); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "ArithmeticOperator"; -- GitLab