From 0bb1dc9d1ffc2d7be5b6c84b17fbb5d57a274f1d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 14 Sep 2022 11:00:44 +0800 Subject: [PATCH] fix(query): reset the output buffer when results have been produced. --- source/libs/executor/inc/executil.h | 1 + source/libs/executor/src/executil.c | 11 +++++++++++ source/libs/executor/src/executorimpl.c | 5 +++-- source/libs/executor/src/timewindowoperator.c | 10 +++------- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 0722c2b306..3f40c18a28 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -88,6 +88,7 @@ struct SqlFunctionCtx; size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); void closeResultRow(SResultRow* pResultRow); +void resetResultRow(SResultRow* pResultRow, size_t entrySize); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 410da92013..cfc5fec25f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -33,6 +33,17 @@ void initResultRowInfo(SResultRowInfo* pResultRowInfo) { void closeResultRow(SResultRow* pResultRow) { pResultRow->closed = true; } +void resetResultRow(SResultRow* pResultRow, size_t entrySize) { + pResultRow->numOfRows = 0; + pResultRow->closed = false; + pResultRow->endInterp = false; + pResultRow->startInterp = false; + + if (entrySize > 0) { + memset(pResultRow->pEntryInfo, 0, entrySize); + } +} + // TODO refactor: use macro SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset) { assert(index >= 0 && offset != NULL); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 202d8eadde..cd1177794e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1293,8 +1293,9 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR } } -int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, - SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { +// todo refactor. SResultRow has direct pointer in miainfo +int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 6568c85edb..c24e04eab1 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4886,14 +4886,8 @@ static int32_t setSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, STimeWind } } - // set time window for current result ,todo extract method + // set time window for current result (*pResult)->win = (*win); - (*pResult)->numOfRows = 0; - (*pResult)->closed = false; - (*pResult)->endInterp = false; - (*pResult)->startInterp = false; - memset((*pResult)->pEntryInfo, 0, pAggSup->resultRowSize - sizeof(SResultRow)); - setResultRowInitCtx((*pResult), pExprSup->pCtx, pExprSup->numOfExprs, pExprSup->rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -4916,6 +4910,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR if (miaInfo->curTs != INT64_MIN) { if (ts != miaInfo->curTs) { finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); + resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); miaInfo->curTs = ts; } } else { @@ -4944,6 +4939,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR pBlock->info.rows, pSup->numOfExprs); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); + resetResultRow(miaInfo->pResultRow, iaInfo->aggSup.resultRowSize - sizeof(SResultRow)); miaInfo->curTs = tsCols[currPos]; currWin.skey = miaInfo->curTs; -- GitLab