From d2ed6ff0e8964d3bbb05bf09a1e95053a18f0bde Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 29 Aug 2022 22:42:15 +0800 Subject: [PATCH] fix(query): set correct page buffer id. --- source/libs/executor/inc/executorimpl.h | 12 ++++++---- source/libs/executor/src/executorimpl.c | 24 ++++++++----------- source/libs/executor/src/timewindowoperator.c | 2 +- 3 files changed, 18 insertions(+), 20 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 05bdc39701..594955d469 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -297,10 +297,11 @@ enum { }; typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - char* keyBuf; // window key buffer - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SHashObj* pResultRowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // current write page id } SAggSupporter; typedef struct { @@ -429,6 +430,7 @@ typedef struct SStreamAggSupporter { char* pKeyBuf; // window key buffer SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + int32_t currentPageId; // buffer page that is active SSDataBlock* pScanBlock; } SStreamAggSupporter; @@ -991,7 +993,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size); -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize); SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9b102de5bf..30d7283264 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -179,24 +179,21 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR } #endif -SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { +SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf); - - if (taosArrayGetSize(list) == 0) { + if (*currentPageId == -1) { pData = getNewBufPage(pResultBuf, &pageId); pData->num = sizeof(SFilePage); } else { - SPageInfo* pi = getLastPageInfo(list); - pData = getBufPage(pResultBuf, getPageId(pi)); - pageId = getPageId(pi); + pData = getBufPage(pResultBuf, *currentPageId); + pageId = *currentPageId; if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one - releaseBufPageInfo(pResultBuf, pi); + releaseBufPage(pResultBuf, pData); pData = getNewBufPage(pResultBuf, &pageId); if (pData != NULL) { @@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); pResultRow->pageId = pageId; pResultRow->offset = (int32_t)pData->num; + *currentPageId = pageId; pData->num += interBufSize; - return pResultRow; } @@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // allocate a new buffer page if (pResult == NULL) { -#ifdef BUF_PAGE_DEBUG - qDebug("page_2"); -#endif ASSERT(pSup->resultRowSize > 0); - pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); + pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); initResultRow(pResult); @@ -3093,7 +3087,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { offset += sizeof(int32_t); uint64_t tableGroupId = *(uint64_t*)(result + offset); - SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); + SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); if (!resultRow) { return TSDB_CODE_TSC_INVALID_INPUT; } @@ -3442,6 +3436,7 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n const char* pKey) { _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pAggSup->currentPageId = -1; pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); @@ -4678,6 +4673,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t size) { + pSup->currentPageId = -1; pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d87e235ba5..f4ebf17646 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3790,7 +3790,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes } if (pWinInfo->pos.pageId == -1) { - *pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); + *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize); if (*pResult == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } -- GitLab