提交 3a2d7235 编写于 作者: H Haojun Liao

[td-11818]Fix bug and refactor code.

上级 233079ec
...@@ -71,28 +71,28 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId); ...@@ -71,28 +71,28 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId);
* @param id * @param id
* @return * @return
*/ */
SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id); SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id);
/** /**
* release the referenced buf pages * release the referenced buf pages
* @param pResultBuf * @param pResultBuf
* @param page * @param page
*/ */
void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page); void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page);
/** /**
* *
* @param pResultBuf * @param pResultBuf
* @param pi * @param pi
*/ */
void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi); void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi);
/** /**
* get the total buffer size in the format of disk file * get the total buffer size in the format of disk file
* @param pResultBuf * @param pResultBuf
* @return * @return
*/ */
size_t getResBufSize(const SDiskbasedBuf* pResultBuf); size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf);
/** /**
* get the number of groups in the result buffer * get the number of groups in the result buffer
...@@ -135,6 +135,13 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf); ...@@ -135,6 +135,13 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf);
*/ */
bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf); bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo
* @param dirty
*/
void setBufPageDirty(SPageInfo* pPageInfo, bool dirty);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -143,7 +143,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_ ...@@ -143,7 +143,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_
// the result does not put into the SDiskbasedBuf, ignore it. // the result does not put into the SDiskbasedBuf, ignore it.
if (pResultRow->pageId >= 0) { if (pResultRow->pageId >= 0) {
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0; int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
......
...@@ -245,8 +245,8 @@ static int compareRowData(const void *a, const void *b, const void *userData) { ...@@ -245,8 +245,8 @@ static int compareRowData(const void *a, const void *b, const void *userData) {
SRowCompSupporter *supporter = (SRowCompSupporter *)userData; SRowCompSupporter *supporter = (SRowCompSupporter *)userData;
STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv; STaskRuntimeEnv* pRuntimeEnv = supporter->pRuntimeEnv;
SFilePage *page1 = getResBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId); SFilePage *page1 = getBufPage(pRuntimeEnv->pResultBuf, pRow1->pageId);
SFilePage *page2 = getResBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); SFilePage *page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
int16_t offset = supporter->dataOffset; int16_t offset = supporter->dataOffset;
char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); char *in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
...@@ -708,12 +708,12 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes ...@@ -708,12 +708,12 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
pData = getNewDataBuf(pResultBuf, tid, &pageId); pData = getNewDataBuf(pResultBuf, tid, &pageId);
} else { } else {
SPageInfo* pi = getLastPageInfo(list); SPageInfo* pi = getLastPageInfo(list);
pData = getResBufPage(pResultBuf, getPageId(pi)); pData = getBufPage(pResultBuf, getPageId(pi));
pageId = getPageId(pi); pageId = getPageId(pi);
if (pData->num + size > getBufPageSize(pResultBuf)) { if (pData->num + size > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseResBufPageInfo(pResultBuf, pi); releaseBufPageInfo(pResultBuf, pi);
pData = getNewDataBuf(pResultBuf, tid, &pageId); pData = getNewDataBuf(pResultBuf, tid, &pageId);
if (pData != NULL) { if (pData != NULL) {
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
...@@ -3651,7 +3651,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { ...@@ -3651,7 +3651,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) {
void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx,
int32_t numOfOutput, int32_t* rowCellInfoOffset) { int32_t numOfOutput, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage* bufPage = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); SFilePage* bufPage = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int32_t offset = 0; int32_t offset = 0;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
...@@ -3724,7 +3724,7 @@ void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, in ...@@ -3724,7 +3724,7 @@ void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, in
void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx,
int32_t numOfCols, int32_t* rowCellInfoOffset) { int32_t numOfCols, int32_t* rowCellInfoOffset) {
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResult->pageId); SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResult->pageId);
int16_t offset = 0; int16_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
...@@ -3967,7 +3967,7 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p ...@@ -3967,7 +3967,7 @@ static int32_t doCopyToSDataBlock(STaskRuntimeEnv* pRuntimeEnv, SGroupResInfo* p
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId); SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pRow->pageId);
int32_t offset = 0; int32_t offset = 0;
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
...@@ -5634,13 +5634,13 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa ...@@ -5634,13 +5634,13 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
} else { } else {
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data); int32_t code = blockDataFromBuf(pSource->pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseResBufPage(pInfo->pSortInternalBuf, pPage); releaseBufPage(pInfo->pSortInternalBuf, pPage);
} }
} }
...@@ -5729,7 +5729,7 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { ...@@ -5729,7 +5729,7 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
blockDataToBuf(pPage->data, p); blockDataToBuf(pPage->data, p);
releaseResBufPage(pInfo->pSortInternalBuf, pPage); releaseBufPage(pInfo->pSortInternalBuf, pPage);
blockDataDestroy(p); blockDataDestroy(p);
start = stop + 1; start = stop + 1;
...@@ -5751,13 +5751,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI ...@@ -5751,13 +5751,13 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI
SExternalMemSource* pSource = cmpParam->pSources[i]; SExternalMemSource* pSource = cmpParam->pSources[i];
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); SFilePage* pPage = getBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo));
int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data); int32_t code = blockDataFromBuf(cmpParam->pSources[i]->pBlock, pPage->data);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseResBufPage(pInfo->pSortInternalBuf, pPage); releaseBufPage(pInfo->pSortInternalBuf, pPage);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -37,7 +37,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) ...@@ -37,7 +37,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
for(int32_t i = 0; i < list->size; ++i) { for(int32_t i = 0; i < list->size; ++i) {
struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i); struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i);
SFilePage* pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); SFilePage* pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes); offset += (int32_t)(pg->num * pMemBucket->bytes);
...@@ -99,7 +99,7 @@ double findOnlyResult(tMemBucket *pMemBucket) { ...@@ -99,7 +99,7 @@ double findOnlyResult(tMemBucket *pMemBucket) {
assert(list->size == 1); assert(list->size == 1);
struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0);
SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); SFilePage* pPage = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
assert(pPage->num == 1); assert(pPage->num == 1);
double v = 0; double v = 0;
...@@ -343,7 +343,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { ...@@ -343,7 +343,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0); assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0);
// keep the pointer in memory // keep the pointer in memory
releaseResBufPage(pBucket->pBuffer, pSlot->info.data); releaseBufPage(pBucket->pBuffer, pSlot->info.data);
pSlot->info.data = NULL; pSlot->info.data = NULL;
} }
...@@ -471,10 +471,10 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) ...@@ -471,10 +471,10 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (int32_t f = 0; f < list->size; ++f) { for (int32_t f = 0; f < list->size; ++f) {
SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f);
SFilePage *pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); SFilePage *pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo));
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); releaseBufPageInfo(pMemBucket->pBuffer, pgInfo);
} }
return getPercentileImpl(pMemBucket, count - num, fraction); return getPercentileImpl(pMemBucket, count - num, fraction);
......
...@@ -305,7 +305,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) { ...@@ -305,7 +305,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) {
return pn; return pn;
} }
static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) { static char* evacOneDataPage(SDiskbasedBuf* pResultBuf) {
char* bufPage = NULL; char* bufPage = NULL;
SListNode* pn = getEldestUnrefedPage(pResultBuf); SListNode* pn = getEldestUnrefedPage(pResultBuf);
...@@ -355,7 +355,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa ...@@ -355,7 +355,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa
char* availablePage = NULL; char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evacOneDataPage(pResultBuf);
// Failed to allocate a new buffer page, and there is an error occurs. // Failed to allocate a new buffer page, and there is an error occurs.
if (availablePage == NULL) { if (availablePage == NULL) {
...@@ -391,7 +391,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa ...@@ -391,7 +391,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa
return (void *)(GET_DATA_PAYLOAD(pi)); return (void *)(GET_DATA_PAYLOAD(pi));
} }
SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id) {
assert(pResultBuf != NULL && id >= 0); assert(pResultBuf != NULL && id >= 0);
pResultBuf->statis.getPages += 1; pResultBuf->statis.getPages += 1;
...@@ -418,7 +418,7 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { ...@@ -418,7 +418,7 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) {
char* availablePage = NULL; char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evacOneDataPage(pResultBuf);
if (availablePage == NULL) { if (availablePage == NULL) {
return NULL; return NULL;
} }
...@@ -440,15 +440,15 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { ...@@ -440,15 +440,15 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) {
} }
} }
void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page) { void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page) {
assert(pResultBuf != NULL && page != NULL); assert(pResultBuf != NULL && page != NULL);
char* p = (char*) page - POINTER_BYTES; char* p = (char*) page - POINTER_BYTES;
SPageInfo* ppi = ((SPageInfo**) p)[0]; SPageInfo* ppi = ((SPageInfo**) p)[0];
releaseResBufPageInfo(pResultBuf, ppi); releaseBufPageInfo(pResultBuf, ppi);
} }
void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) {
assert(pi->pData != NULL && pi->used); assert(pi->pData != NULL && pi->used);
pi->used = false; pi->used = false;
...@@ -457,7 +457,7 @@ void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { ...@@ -457,7 +457,7 @@ void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) {
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); }
size_t getResBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; }
SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) { SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) {
assert(pResultBuf != NULL); assert(pResultBuf != NULL);
...@@ -529,3 +529,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) { ...@@ -529,3 +529,7 @@ int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) {
bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) { bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) {
return pResultBuf->fileSize == 0; return pResultBuf->fileSize == 0;
} }
void setBufPageDirty(SPageInfo* pPageInfo, bool dirty) {
pPageInfo->dirty = dirty;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册