From 632a26d0fe8612e7f72d5cc2347d68836dff2279 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 13 Sep 2022 17:39:43 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/libs/executor/inc/executorimpl.h | 42 +-- source/libs/executor/src/executorimpl.c | 350 ++---------------- source/libs/executor/src/timewindowoperator.c | 166 ++++----- 3 files changed, 115 insertions(+), 443 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a9826e0018..61c6f9878e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -584,11 +584,12 @@ typedef struct SIntervalAggOperatorInfo { typedef struct SMergeAlignedIntervalAggOperatorInfo { SIntervalAggOperatorInfo* intervalAggOperatorInfo; - bool hasGroupId; +// bool hasGroupId; uint64_t groupId; // current groupId int64_t curTs; // current ts SSDataBlock* prefetchedBlock; SNode* pCondition; + SResultRow* pResultRow; } SMergeAlignedIntervalAggOperatorInfo; typedef struct SStreamIntervalOperatorInfo { @@ -648,7 +649,6 @@ typedef struct SAggOperatorInfo { } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; SNode* pFilterNode; // filter info, which is push down by optimizer @@ -690,7 +690,6 @@ typedef struct SFillOperatorInfo { } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -737,7 +736,6 @@ typedef struct SWindowRowsSup { } SWindowRowsSup; typedef struct SSessionAggOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -825,7 +823,6 @@ typedef struct SStateWindowOperatorInfo { SStateKeys stateKey; int32_t tsSlotId; // primary timestamp column slot id STimeWindowAggSupp twAggSup; - // bool reptScan; const SNode* pCondition; } SStateWindowOperatorInfo; @@ -846,24 +843,6 @@ typedef struct SStreamStateAggOperatorInfo { bool ignoreExpiredData; } SStreamStateAggOperatorInfo; -typedef struct SSortedMergeOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - - SArray* pSortInfo; - int32_t numOfSources; - SSortHandle* pSortHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - int32_t resultRowFactor; - bool hasGroupVal; - SDiskbasedBuf* pTupleStore; // keep the final results - int32_t numOfResPerPage; - char** groupVal; - SArray* groupInfo; -} SSortedMergeOperatorInfo; - typedef struct SSortOperatorInfo { SOptrBasicInfo binfo; uint32_t sortBufSize; // max buffer size for in-memory sort @@ -871,11 +850,10 @@ typedef struct SSortOperatorInfo { SSortHandle* pSortHandle; SArray* pColMatchInfo; // for index map from table scan output int32_t bufPageSize; - - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - SLimitInfo limitInfo; - SNode* pCondition; + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + SLimitInfo limitInfo; + SNode* pCondition; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -907,7 +885,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_decode_fn_t decode, __optr_explain_fn_t explain); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); @@ -942,7 +919,6 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int SSDataBlock* pBlock, const char* idStr); void cleanupAggSup(SAggSupporter* pAggSup); -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId); @@ -1089,10 +1065,8 @@ void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEn void printDataBlock(SSDataBlock* pBlock, const char* flag); uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId); -int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, - const int32_t* rowCellOffset, SSDataBlock* pBlock, - SExecTaskInfo* pTaskInfo); +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle, STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 205bcd58df..e5f0d4d177 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -132,8 +132,6 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, return fpSet; } -void operatorDummyCloseFn(void* param, int32_t numOfCols) {} - static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo); @@ -1269,33 +1267,12 @@ static void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t nu } } -// todo extract method with copytoSSDataBlock -int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, - const int32_t* rowCellOffset, SSDataBlock* pBlock, - SExecTaskInfo* pTaskInfo) { - SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); - SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); - - doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowCellOffset); - if (pRow->numOfRows == 0) { - releaseBufPage(pBuf, page); - return 0; - } - - while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25); - if (TAOS_FAILED(code)) { - releaseBufPage(pBuf, page); - qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } - } - +static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, + SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo) { for (int32_t j = 0; j < numOfExprs; ++j) { int32_t slotId = pExprInfo[j].base.resSchema.slotId; - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset); + pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); if (pCtx[j].fpSet.finalize) { int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { @@ -1303,7 +1280,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi T_LONG_JMP(pTaskInfo->env, code); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor + // do nothing } else { // expand the result into multiple rows. E.g., _wstart, top(k, 20) // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. @@ -1314,10 +1291,39 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi } } } +} + +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { + SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); + + SqlFunctionCtx* pCtx = pSup->pCtx; + SExprInfo* pExprInfo = pSup->pExprInfo; + const int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + + doUpdateNumOfRows(pCtx, pRow, pSup->numOfExprs, rowEntryOffset); + if (pRow->numOfRows == 0) { + releaseBufPage(pBuf, page); + return 0; + } + + int32_t size = pBlock->info.capacity; + while (pBlock->info.rows + pRow->numOfRows > size) { + size = size * 1.25; + } + + int32_t code = blockDataEnsureCapacity(pBlock, size); + if (TAOS_FAILED(code)) { + releaseBufPage(pBuf, page); + qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + T_LONG_JMP(pTaskInfo->env, code); + } + + doCopyResultToDataBlock(pExprInfo, pSup->numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; - return 0; } @@ -1362,32 +1368,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS } pGroupResInfo->index += 1; - - for (int32_t j = 0; j < numOfExprs; ++j) { - int32_t slotId = pExprInfo[j].base.resSchema.slotId; - - pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset); - if (pCtx[j].fpSet.finalize) { -#ifdef BUF_PAGE_DEBUG - qDebug("\npage_finalize %d", numOfExprs); -#endif - int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); - if (TAOS_FAILED(code)) { - qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - T_LONG_JMP(pTaskInfo->env, code); - } - } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { - // do nothing, todo refactor - } else { - // expand the result into multiple rows. E.g., _wstart, top(k, 20) - // the _wstart needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); - char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); - for (int32_t k = 0; k < pRow->numOfRows; ++k) { - colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); - } - } - } + doCopyResultToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; @@ -2307,21 +2288,6 @@ _error: static int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, const char* pKey); -static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { - SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*)param; - taosArrayDestroy(pInfo->pSortInfo); - taosArrayDestroy(pInfo->groupInfo); - - if (pInfo->pSortHandle != NULL) { - tsortDestroySortHandle(pInfo->pSortHandle); - } - - blockDataDestroy(pInfo->binfo.pRes); - cleanupAggSup(&pInfo->aggSup); - - taosMemoryFreeClear(param); -} - static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { size_t size = taosArrayGetSize(groupInfo); if (size == 0) { @@ -2357,41 +2323,6 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3 return 0; } -static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr, - int32_t rowIndex) { - for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index - // pCtx[j].startRow = rowIndex; - } - - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - // pCtx[j].fpSet->addInput(&pCtx[j]); - - // if (functionId < 0) { - // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE); - // } else { - // assert(!TSDB_FUNC_IS_SCALAR(functionId)); - // aAggs[functionId].mergeFunc(&pCtx[j]); - // } - } -} - -static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) { - for (int32_t j = 0; j < numOfExpr; ++j) { - int32_t functionId = pCtx[j].functionId; - // if (functionId == FUNC_TAG_DUMMY || functionId == FUNC_TS_DUMMY) { - // continue; - // } - - // if (functionId < 0) { - // SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1); - // doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); - // } else { - // pCtx[j].fpSet.finalize(&pCtx[j]); - } -} - static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock* pBlock, int32_t rowIndex) { int32_t size = (int32_t)taosArrayGetSize(pColumnList); @@ -2406,210 +2337,6 @@ static bool saveCurrentTuple(char** rowColData, SArray* pColumnList, SSDataBlock return true; } -static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock* pBlock) { - SSortedMergeOperatorInfo* pInfo = pOperator->info; - - SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx; - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - if (!pInfo->hasGroupVal) { - ASSERT(i == 0); - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); - } else { - if (needToMerge(pBlock, pInfo->groupInfo, pInfo->groupVal, i)) { - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - } else { - doFinalizeResultImpl(pCtx, numOfExpr); - int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows); - - // TODO check for available buffer; - - // next group info data - pInfo->binfo.pRes->info.rows += numOfRows; - for (int32_t j = 0; j < numOfExpr; ++j) { - if (pCtx[j].functionId < 0) { - continue; - } - - pCtx[j].fpSet.process(&pCtx[j]); - } - - doMergeResultImpl(pInfo, pCtx, numOfExpr, i); - pInfo->hasGroupVal = saveCurrentTuple(pInfo->groupVal, pInfo->groupInfo, pBlock, i); - } - } - } -} - -static SSDataBlock* doMerge(SOperatorInfo* pOperator) { - SSortedMergeOperatorInfo* pInfo = pOperator->info; - SSortHandle* pHandle = pInfo->pSortHandle; - - SSDataBlock* pDataBlock = createOneDataBlock(pInfo->binfo.pRes, false); - blockDataEnsureCapacity(pDataBlock, pOperator->resultInfo.capacity); - - while (1) { - blockDataCleanup(pDataBlock); - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - // build datablock for merge for one group - appendOneRowToDataBlock(pDataBlock, pTupleHandle); - if (pDataBlock->info.rows >= pOperator->resultInfo.capacity) { - break; - } - } - - if (pDataBlock->info.rows == 0) { - break; - } - - setInputDataBlock(pOperator, pOperator->exprSupp.pCtx, pDataBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - // updateOutputBuf(&pInfo->binfo, &pAggInfo->bufCapacity, pBlock->info.rows * pAggInfo->resultRowFactor, - // pOperator->pRuntimeEnv, true); - doMergeImpl(pOperator, pOperator->exprSupp.numOfExprs, pDataBlock); - // flush to tuple store, and after all data have been handled, return to upstream node or sink node - } - - doFinalizeResultImpl(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs); - int32_t numOfRows = getNumOfResult(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); - // setTagValueForMultipleRows(pCtx, pOperator->exprSupp.numOfExprs, numOfRows); - - // TODO check for available buffer; - - // next group info data - pInfo->binfo.pRes->info.rows += numOfRows; - return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL; -} - -SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SSortedMergeOperatorInfo* pInfo) { - blockDataCleanup(pDataBlock); - - SSDataBlock* p = tsortGetSortedDataBlock(pHandle); - if (p == NULL) { - return NULL; - } - - blockDataEnsureCapacity(p, capacity); - - while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); - if (pTupleHandle == NULL) { - break; - } - - appendOneRowToDataBlock(p, pTupleHandle); - if (p->info.rows >= capacity) { - break; - } - } - - if (p->info.rows > 0) { - int32_t numOfCols = taosArrayGetSize(pColMatchInfo); - for (int32_t i = 0; i < numOfCols; ++i) { - SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); - ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); - - SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); - SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); - colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); - } - - pDataBlock->info.rows = p->info.rows; - pDataBlock->info.capacity = p->info.rows; - } - - blockDataDestroy(p); - return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; -} - -static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SSortedMergeOperatorInfo* pInfo = pOperator->info; - if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo); - } - - int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - - tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); - - for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - ps->param = pOperator->pDownstream[i]; - tsortAddSource(pInfo->pSortHandle, ps); - } - - int32_t code = tsortOpen(pInfo->pSortHandle); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, terrno); - } - - pOperator->status = OP_RES_TO_RETURN; - return doMerge(pOperator); -} - -static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, - SSortedMergeOperatorInfo* pInfo) { - if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { - return 0; - } - - int32_t len = 0; - SArray* plist = taosArrayInit(3, sizeof(SColumn)); - pInfo->groupInfo = taosArrayInit(3, sizeof(int32_t)); - - if (plist == NULL || pInfo->groupInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - size_t numOfGroupCol = taosArrayGetSize(pInfo->groupInfo); - for (int32_t i = 0; i < numOfGroupCol; ++i) { - SColumn* pCol = taosArrayGet(pGroupInfo, i); - for (int32_t j = 0; j < numOfCols; ++j) { - SExprInfo* pe = &pExprInfo[j]; - if (pe->base.resSchema.slotId == pCol->colId) { - taosArrayPush(plist, pCol); - taosArrayPush(pInfo->groupInfo, &j); - len += pCol->bytes; - break; - } - } - } - - ASSERT(taosArrayGetSize(pGroupInfo) == taosArrayGetSize(plist)); - - pInfo->groupVal = taosMemoryCalloc(1, (POINTER_BYTES * numOfGroupCol + len)); - if (pInfo->groupVal == NULL) { - taosArrayDestroy(plist); - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t offset = 0; - char* start = (char*)(pInfo->groupVal + (POINTER_BYTES * numOfGroupCol)); - for (int32_t i = 0; i < numOfGroupCol; ++i) { - pInfo->groupVal[i] = start + offset; - SColumn* pCol = taosArrayGet(plist, i); - offset += pCol->bytes; - } - - taosArrayDestroy(plist); - - return TSDB_CODE_SUCCESS; -} - int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag) { // todo add more information about exchange operation int32_t type = pOperator->operatorType; @@ -3342,13 +3069,6 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo) { pInfo->pRes = blockDataDestroy(pInfo->pRes); } -void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) { - SOptrBasicInfo* pInfo = (SOptrBasicInfo*)param; - cleanupBasicInfo(pInfo); - - taosMemoryFreeClear(param); -} - static void freeItem(void* pItem) { void** p = pItem; if (*p != NULL) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d773f8a629..81a5d9953c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -46,19 +46,6 @@ static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, co uint64_t groupId); static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult); -///* -// * There are two cases to handle: -// * -// * 1. Query range is not set yet (queryRangeSet = 0). we need to set the query range info, including -// * pQueryAttr->lastKey, pQueryAttr->window.skey, and pQueryAttr->eKey. -// * 2. Query range is set and query is in progress. There may be another result with the same query ranges to be -// * merged during merge stage. In this case, we need the pTableQueryInfo->lastResRows to decide if there -// * is a previous result generated or not. -// */ -// static void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWindow* pQRange) { -// // do nothing -//} - static TSKEY getStartTsKey(STimeWindow* win, const TSKEY* tsCols) { return tsCols == NULL ? win->skey : tsCols[0]; } static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, bool masterscan, @@ -3004,9 +2991,9 @@ static void addRetriveWindow(SArray* wins, SStreamFinalIntervalOperatorInfo* pIn SPullWindowInfo pull = {.window = nextWin, .groupId = winKey->groupId}; // add pull data request savePullWindow(&pull, pInfo->pPullWins); - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size); + int32_t size1 = taosArrayGetSize(pInfo->pChildren); + addPullWindow(pInfo->pPullDataMap, winKey, size1); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); } } } @@ -4884,66 +4871,64 @@ void destroyMergeAlignedIntervalOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, - SSDataBlock* pResultBlock, TSKEY wstartTs) { - SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; - - SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - SExprSupp* pSup = &pOperatorInfo->exprSupp; +static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) { + SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); + pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + return pResult; +} - SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); - SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( - iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(p1 != NULL); +static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult, + SExprSupp* pExprSup, SAggSupporter* pAggSup) { + if (*pResult == NULL) { + *pResult = doSetSingleOutputTupleBuf(pResultRowInfo, pAggSup); + if (*pResult == NULL) { + return terrno; + } + } - finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pSup->pCtx, pSup->pExprInfo, pSup->numOfExprs, - pSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); - tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); + // set time window for current result ,todo extract method + (*pResult)->win = (*win); + (*pResult)->numOfRows = 0; + (*pResult)->closed = false; + (*pResult)->endInterp = false; + (*pResult)->startInterp = false; + memset((*pResult)->pEntryInfo, 0, pAggSup->resultRowSize - sizeof(SResultRow)); + setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, - SSDataBlock* pBlock, int32_t scanFlag, SSDataBlock* pResultBlock) { + SSDataBlock* pBlock, SSDataBlock* pResultBlock) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExprSupp* pSup = &pOperatorInfo->exprSupp; + SInterval* pInterval = &iaInfo->interval; - int32_t startPos = 0; - int32_t numOfOutput = pSup->numOfExprs; - int64_t* tsCols = extractTsCol(pBlock, iaInfo); - uint64_t tableGroupId = pBlock->info.groupId; - SResultRow* pResult = NULL; + int32_t startPos = 0; + int64_t* tsCols = extractTsCol(pBlock, iaInfo); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); // there is an result exists if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); - if (ts != miaInfo->curTs) { - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); + finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); miaInfo->curTs = ts; } } else { miaInfo->curTs = ts; - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 0); } STimeWindow win = {0}; win.skey = miaInfo->curTs; - win.ekey = - taosTimeAdd(win.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, iaInfo->interval.precision) - 1; + win.ekey = taosTimeAdd(win.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; - // TODO: remove the hash table (groupid + winkey => result row position) - int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, - pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + int32_t ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { + T_LONG_JMP(pTaskInfo->env, ret); } int32_t currPos = startPos; @@ -4956,21 +4941,18 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, numOfOutput); + pBlock->info.rows, pSup->numOfExprs); - outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs); + finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); miaInfo->curTs = tsCols[currPos]; currWin.skey = miaInfo->curTs; - currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - - 1; + currWin.ekey = taosTimeAdd(currWin.skey, pInterval->interval, pInterval->intervalUnit, pInterval->precision) - 1; startPos = currPos; - ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); - if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + ret = setSingleOutputTupleBuf(pResultRowInfo, &win, &miaInfo->pResultRow, pSup, &iaInfo->aggSup); + if (ret != TSDB_CODE_SUCCESS || miaInfo->pResultRow == NULL) { + T_LONG_JMP(pTaskInfo->env, ret); } miaInfo->curTs = currWin.skey; @@ -4978,68 +4960,66 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - pBlock->info.rows, numOfOutput); + pBlock->info.rows, pSup->numOfExprs); } static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SMergeAlignedIntervalAggOperatorInfo* miaInfo = pOperator->info; - SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; - - SExprSupp* pSup = &pOperator->exprSupp; - SSDataBlock* pRes = iaInfo->binfo.pRes; + SMergeAlignedIntervalAggOperatorInfo* pMiaInfo = pOperator->info; + SIntervalAggOperatorInfo* pIaInfo = pMiaInfo->intervalAggOperatorInfo; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - int32_t scanFlag = MAIN_SCAN; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pRes = pIaInfo->binfo.pRes; + SResultRowInfo* pResultRowInfo = &pIaInfo->binfo.resultRowInfo; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t scanFlag = MAIN_SCAN; while (1) { SSDataBlock* pBlock = NULL; - if (miaInfo->prefetchedBlock == NULL) { + if (pMiaInfo->prefetchedBlock == NULL) { pBlock = downstream->fpSet.getNextFn(downstream); } else { - pBlock = miaInfo->prefetchedBlock; - miaInfo->prefetchedBlock = NULL; + pBlock = pMiaInfo->prefetchedBlock; + pMiaInfo->prefetchedBlock = NULL; - miaInfo->groupId = pBlock->info.groupId; + pMiaInfo->groupId = pBlock->info.groupId; } + // no data exists, all query processing is done if (pBlock == NULL) { - // close last unfinalized time window - if (miaInfo->curTs != INT64_MIN) { - ASSERT(tSimpleHashGetSize(iaInfo->aggSup.pResultRowHashTable) == 1); - outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); - miaInfo->curTs = INT64_MIN; + // close last unclosed time window + if (pMiaInfo->curTs != INT64_MIN) { + finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); + pMiaInfo->curTs = INT64_MIN; } doSetOperatorCompleted(pOperator); break; } - if (!miaInfo->hasGroupId) { - miaInfo->hasGroupId = true; - miaInfo->groupId = pBlock->info.groupId; - } else if (miaInfo->groupId != pBlock->info.groupId) { + if (pMiaInfo->groupId != pBlock->info.groupId && pMiaInfo->groupId != 0) { // if there are unclosed time window, close it firstly. - ASSERT(miaInfo->curTs != INT64_MIN); - outputMergeAlignedIntervalResult(pOperator, miaInfo->groupId, pRes, miaInfo->curTs); - miaInfo->prefetchedBlock = pBlock; - miaInfo->curTs = INT64_MIN; + ASSERT(pMiaInfo->curTs != INT64_MIN); + finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); + + pMiaInfo->prefetchedBlock = pBlock; + pMiaInfo->curTs = INT64_MIN; + pMiaInfo->groupId = 0; break; } - getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); - setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); - doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + getTableScanInfo(pOperator, &pIaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true); + doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(miaInfo->pCondition, pRes, NULL); + doFilter(pMiaInfo->pCondition, pRes, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } } - pRes->info.groupId = miaInfo->groupId; - miaInfo->hasGroupId = false; + pRes->info.groupId = pMiaInfo->groupId; } static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { @@ -5191,8 +5171,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SResultRowPosition* p1 = (SResultRowPosition*)tSimpleHashGet( iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); ASSERT(p1 != NULL); - finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, pExprSup->pCtx, pExprSup->pExprInfo, - pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset, pResultBlock, pTaskInfo); +// finalizeResultRows(iaInfo->aggSup.pResultBuf, p1, pResultBlock, pTaskInfo); tSimpleHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); return TSDB_CODE_SUCCESS; } @@ -5201,9 +5180,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t STimeWindow* newWin) { SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); - SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin}; tdListAppend(miaInfo->groupIntervals, &groupTimeWindow); @@ -5216,9 +5193,10 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t if (prevGrpWin->groupId != tableGroupId) { continue; } + STimeWindow* prevWin = &prevGrpWin->window; if ((ascScan && newWin->skey > prevWin->ekey) || ((!ascScan) && newWin->skey < prevWin->ekey)) { - finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); +// finalizeWindowResult(pOperatorInfo, tableGroupId, prevWin, pResultBlock); tdListPopNode(miaInfo->groupIntervals, listNode); } } @@ -5378,7 +5356,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { if (listNode != NULL) { SGroupTimeWindow* grpWin = (SGroupTimeWindow*)(listNode->data); - finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); +// finalizeWindowResult(pOperator, grpWin->groupId, &grpWin->window, pRes); pRes->info.groupId = grpWin->groupId; } } -- GitLab