diff --git a/include/util/tpagedfile.h b/include/util/tpagedfile.h index 079b4c0c50ba8472abcc206465d4f4245016367d..9152e8685d9ba27b7901dd5831d95301f36fd08f 100644 --- a/include/util/tpagedfile.h +++ b/include/util/tpagedfile.h @@ -71,28 +71,28 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId); * @param id * @return */ -SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id); +SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id); /** * release the referenced buf pages * @param pResultBuf * @param page */ -void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page); +void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page); /** * * @param pResultBuf * @param pi */ -void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi); +void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi); /** * get the total buffer size in the format of disk file * @param pResultBuf * @return */ -size_t getResBufSize(const SDiskbasedBuf* pResultBuf); +size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf); /** * get the number of groups in the result buffer @@ -135,6 +135,13 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf); */ bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf); +/** + * Set the buffer page is dirty, and needs to be flushed to disk when swap out. + * @param pPageInfo + * @param dirty + */ +void setBufPageDirty(SPageInfo* pPageInfo, bool dirty); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 18c4075c4566080ebaa229282552ae1f467e1108..7ef2c03fda59319a615deec6527ae6209a95a6f0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -143,7 +143,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_ // the result does not put into the SDiskbasedBuf, ignore it. if (pResultRow->pageId >= 0) { - SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); + SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); int16_t offset = 0; for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1e0f05d4bb5f57cc068461589840ae6e436e7d2f..e452da089a26b565bf52f8092a588c8991093562 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -245,8 +245,8 @@ static int compareRowData(const void *a, const void *b, const void *userData) { SRowCompSupporter *supporter = (SRowCompSupporter *)userData; STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv; - SFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); - SFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); + SFilePage *page1 = getBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); + SFilePage *page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); int16_t offset = supporter->dataOffset; char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); @@ -708,12 +708,12 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes pData = getNewDataBuf(pResultBuf, tid, &pageId); } else { SPageInfo* pi = getLastPageInfo(list); - pData = getResBufPage(pResultBuf, getPageId(pi)); + pData = getBufPage(pResultBuf, getPageId(pi)); pageId = getPageId(pi); if (pData->num + size > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one - releaseResBufPageInfo(pResultBuf, pi); + releaseBufPageInfo(pResultBuf, pi); pData = getNewDataBuf(pResultBuf, tid, &pageId); if (pData != NULL) { assert(pData->num == 0); // number of elements must be 0 for new allocated buffer @@ -3651,7 +3651,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - SFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); + SFilePage* bufPage = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); int32_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -3724,7 +3724,7 @@ void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, in void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfCols, int32_t* rowCellInfoOffset) { // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group - SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); + SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); int16_t offset = 0; for (int32_t i = 0; i < numOfCols; ++i) { @@ -3967,7 +3967,7 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p pGroupResInfo->index += 1; - SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); + SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); int32_t offset = 0; for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { @@ -5634,13 +5634,13 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa } else { SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data); if (code != TSDB_CODE_SUCCESS) { return code; } - releaseResBufPage(pInfo->pSortInternalBuf, pPage); + releaseBufPage(pInfo->pSortInternalBuf, pPage); } } @@ -5729,7 +5729,7 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); blockDataToBuf(pPage->data, p); - releaseResBufPage(pInfo->pSortInternalBuf, pPage); + releaseBufPage(pInfo->pSortInternalBuf, pPage); blockDataDestroy(p); start = stop + 1; @@ -5751,13 +5751,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI SExternalMemSource* pSource = cmpParam->pSources[i]; SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); - SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); if (code != TSDB_CODE_SUCCESS) { return code; } - releaseResBufPage(pInfo->pSortInternalBuf, pPage); + releaseBufPage(pInfo->pSortInternalBuf, pPage); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index ab711d1f980e5212c967c5410ac6bfe7319a646e..e9c5518de86cedbc4027458c97e58844f4333196 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -37,7 +37,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) for(int32_t i = 0; i < list->size; ++i) { struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i); - SFilePage* pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + SFilePage* pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); @@ -99,7 +99,7 @@ double findOnlyResult(tMemBucket *pMemBucket) { assert(list->size == 1); struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); - SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); assert(pPage->num == 1); double v = 0; @@ -343,7 +343,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0); // keep the pointer in memory - releaseResBufPage(pBucket->pBuffer, pSlot->info.data); + releaseBufPage(pBucket->pBuffer, pSlot->info.data); pSlot->info.data = NULL; } @@ -471,10 +471,10 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (int32_t f = 0; f < list->size; ++f) { SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); - SFilePage *pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + SFilePage *pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); - releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); + releaseBufPageInfo(pMemBucket->pBuffer, pgInfo); } return getPercentileImpl(pMemBucket, count - num, fraction); diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index 8f326745070485057eac3ef7c8e3dea55c9a22d8..4231e997665c601fc1de3b3a6c73c8a702923b7c 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -305,7 +305,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) { return pn; } -static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) { +static char* evacOneDataPage(SDiskbasedBuf* pResultBuf) { char* bufPage = NULL; SListNode* pn = getEldestUnrefedPage(pResultBuf); @@ -355,7 +355,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { - availablePage = evicOneDataPage(pResultBuf); + availablePage = evacOneDataPage(pResultBuf); // Failed to allocate a new buffer page, and there is an error occurs. if (availablePage == NULL) { @@ -391,7 +391,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa return (void *)(GET_DATA_PAYLOAD(pi)); } -SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { +SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { assert(pResultBuf != NULL && id >= 0); pResultBuf->statis.getPages += 1; @@ -418,7 +418,7 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { - availablePage = evicOneDataPage(pResultBuf); + availablePage = evacOneDataPage(pResultBuf); if (availablePage == NULL) { return NULL; } @@ -440,15 +440,15 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { } } -void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page) { +void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page) { assert(pResultBuf != NULL && page != NULL); char* p = (char*) page - POINTER_BYTES; SPageInfo* ppi = ((SPageInfo**) p)[0]; - releaseResBufPageInfo(pResultBuf, ppi); + releaseBufPageInfo(pResultBuf, ppi); } -void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { +void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { assert(pi->pData != NULL && pi->used); pi->used = false; @@ -457,7 +457,7 @@ void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } -size_t getResBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } +size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); @@ -529,3 +529,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) { bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) { return pResultBuf->fileSize == 0; } + +void setBufPageDirty(SPageInfo* pPageInfo, bool dirty) { + pPageInfo->dirty = dirty; +}