From 2889b8d9135b972f00300a3c10b5c1661aaf0cb8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 23 Aug 2022 17:28:08 +0800 Subject: [PATCH] refactor(query): do some internal refactor. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 30 +++++++++++++--------- source/libs/executor/src/executorimpl.c | 1 + source/libs/executor/src/projectoperator.c | 22 +++++++++++----- 3 files changed, 34 insertions(+), 19 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a4738781f5..bbef249079 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -178,7 +178,7 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); -static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, +static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); @@ -1510,6 +1510,7 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf return TSDB_CODE_SUCCESS; } +#if 0 static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; @@ -1536,7 +1537,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* freeTSRow = true; } } else if (k.ts < key) { // k.ts < key - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -1549,7 +1550,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { // descending order scan if (key < k.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); } else if (k.ts < key) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { return TSDB_CODE_SUCCESS; @@ -1583,6 +1584,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return TSDB_CODE_SUCCESS; } +#endif + static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1734,6 +1737,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo return TSDB_CODE_SUCCESS; } +#if 0 static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1779,7 +1783,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts < key <= k.ts // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid); if (freeTSRow) { taosMemoryFree(pTSRow); @@ -1790,7 +1794,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [5] k.ts < key <= ik.ts // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { - doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid); if (freeTSRow) { taosMemoryFree(pTSRow); @@ -1836,7 +1840,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts > k.ts >= Key // [4] ik.ts > key >= k.ts if (ik.ts > key) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, uid); if (freeTSRow) { taosMemoryFree(pTSRow); @@ -1859,7 +1863,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* //[7] key = ik.ts > k.ts if (key == ik.ts) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMerge(&merge, &fRow); @@ -1876,6 +1880,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(0); return -1; } +#endif static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { @@ -3115,7 +3120,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, +void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3197,6 +3202,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); SArray* pDelList = pBlockScanInfo->delSkyline; + uint64_t uid = pBlockScanInfo->uid; // todo refactor bool asc = ASCENDING_TRAVERSE(pReader->order); @@ -3219,9 +3225,9 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY ik = TSDBROW_KEY(piRow); if (ik.ts < k.ts) { // ik.ts < k.ts - doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } else if (k.ts < ik.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); *freeTSRow = true; @@ -3231,12 +3237,12 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 82723eebf2..9e56d63673 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3487,6 +3487,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n qError("Init stream agg supporter failed since %s", terrstr(terrno)); return terrno; } + int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir); if (code != TSDB_CODE_SUCCESS) { qError("Create agg result buf failed since %s", tstrerror(code)); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index f2b79bf703..94da3e23e1 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -50,9 +50,11 @@ static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { + int32_t code = TSDB_CODE_SUCCESS; SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _error; } @@ -67,12 +69,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.pRes = pResBlock; pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFilterNode = pProjPhyNode->node.pConditions; - pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; - - // todo remove it soon if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { pInfo->mergeDataBlocks = false; + } else { + pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; } int32_t numOfRows = 4096; @@ -83,9 +84,13 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys if (numOfRows * pResBlock->info.rowSize > TWOMB) { numOfRows = TWOMB / pResBlock->info.rowSize; } + initResultSizeInfo(&pOperator->resultInfo, numOfRows); + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); @@ -99,7 +104,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -107,7 +112,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + destroyProjectOperatorInfo(pInfo, numOfCols); + taosMemoryFree(pOperator); + pTaskInfo->code = code; return NULL; } @@ -175,7 +182,8 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); //TODO: optimize it later when partition by + limit - if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { + if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || + (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { doSetOperatorCompleted(pOperator); } } -- GitLab