diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index c8743b00e0d33e4e4d7d71cfa336d2da8b9a68e0..9007fe0b4970ac1cb6725fd42d39a41cd4cbf97d 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -39,73 +39,73 @@ typedef struct SFilePage { /** * create disk-based result buffer - * @param pResultBuf + * @param pBuf * @param rowSize * @param pagesize * @param inMemPages * @param handle * @return */ -int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); +int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); /** * - * @param pResultBuf + * @param pBuf * @param groupId * @param pageId * @return */ -SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId); +SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); /** * - * @param pResultBuf + * @param pBuf * @param groupId * @return */ -SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId); +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); /** * get the specified buffer page by id - * @param pResultBuf + * @param pBuf * @param id * @return */ -SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id); +SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id); /** * release the referenced buf pages - * @param pResultBuf + * @param pBuf * @param page */ -void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page); +void releaseBufPage(SDiskbasedBuf* pBuf, void* page); /** * - * @param pResultBuf + * @param pBuf * @param pi */ -void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi); +void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi); /** * get the total buffer size in the format of disk file - * @param pResultBuf + * @param pBuf * @return */ -size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf); +size_t getTotalBufSize(const SDiskbasedBuf* pBuf); /** * get the number of groups in the result buffer - * @param pResultBuf + * @param pBuf * @return */ -size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf); +size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf); /** * destroy result buffer - * @param pResultBuf + * @param pBuf */ -void destroyResultBuf(SDiskbasedBuf* pResultBuf); +void destroyResultBuf(SDiskbasedBuf* pBuf); /** * @@ -123,24 +123,30 @@ int32_t getPageId(const SPageInfo* pPgInfo); /** * Return the buffer page size. - * @param pResultBuf + * @param pBuf * @return */ -int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf); +int32_t getBufPageSize(const SDiskbasedBuf* pBuf); /** * - * @param pResultBuf + * @param pBuf * @return */ -bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf); +bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf); /** * Set the buffer page is dirty, and needs to be flushed to disk when swap out. * @param pPageInfo * @param dirty */ -void setBufPageDirty(SPageInfo* pPageInfo, bool dirty); +void setBufPageDirty(SFilePage* pPageInfo, bool dirty); + +/** + * Print the statistics when closing this buffer + * @param pBuf + */ +void printStatisBeforeClose(SDiskbasedBuf* pBuf); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e452da089a26b565bf52f8092a588c8991093562..c1d2207d606e3d91f65e4e9a488fe70d7cc7b0d7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5729,6 +5729,8 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); blockDataToBuf(pPage->data, p); + + setBufPageDirty(pPage, true); releaseBufPage(pInfo->pSortInternalBuf, pPage); blockDataDestroy(p); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 12a3fd6c31604557fb1f587d925b63c229a35315..6e3e6a8ffcaf655c059baf82d2e330af97f9c8ed 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -293,6 +293,8 @@ TEST(testCase, external_sort_Test) { // } } + printStatisBeforeClose(((SOrderOperatorInfo*) pOperator->info)->pSortInternalBuf); + int64_t s2 = taosGetTimestampUs(); printf("total:%ld\n", s2 - s1); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 096c185d48d1513e643277e5e1ad9e5f71676311..2766814b57dd9ca6bea24327010ece1391858d04 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -1,4 +1,6 @@ #include "tpagedbuf.h" +#include +#include #include "stddef.h" #include "taoserror.h" #include "tcompression.h" @@ -28,8 +30,9 @@ typedef struct SPageInfo { } SPageInfo; typedef struct SDiskbasedBufStatis { - int32_t flushBytes; - int32_t loadBytes; + int64_t flushBytes; + int64_t loadBytes; + int32_t loadPages; int32_t getPages; int32_t releasePages; int32_t flushPages; @@ -55,12 +58,15 @@ typedef struct SDiskbasedBuf { uint64_t qId; // for debug purpose SDiskbasedBufStatis statis; + bool printStatis; // Print statistics info when closing this buffer. } SDiskbasedBuf; -int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { - *pResultBuf = calloc(1, sizeof(SDiskbasedBuf)); +static void printStatisData(const SDiskbasedBuf* pBuf); - SDiskbasedBuf* pResBuf = *pResultBuf; + int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { + *pBuf = calloc(1, sizeof(SDiskbasedBuf)); + + SDiskbasedBuf* pResBuf = *pBuf; if (pResBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -97,50 +103,50 @@ int32_t createDiskbasedBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int3 return TSDB_CODE_SUCCESS; } -static int32_t createDiskFile(SDiskbasedBuf* pResultBuf) { - pResultBuf->file = fopen(pResultBuf->path, "wb+"); - if (pResultBuf->file == NULL) { -// qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); +static int32_t createDiskFile(SDiskbasedBuf* pBuf) { + pBuf->file = fopen(pBuf->path, "wb+"); + if (pBuf->file == NULL) { +// qError("failed to create tmp file: %s on disk. %s", pBuf->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } return TSDB_CODE_SUCCESS; } -static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing - if (!pResultBuf->comp) { +static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing + if (!pBuf->comp) { *dst = srcSize; return data; } - *dst = tsCompressString(data, srcSize, 1, pResultBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0); + *dst = tsCompressString(data, srcSize, 1, pBuf->assistBuf, srcSize, ONE_STAGE_COMP, NULL, 0); - memcpy(data, pResultBuf->assistBuf, *dst); + memcpy(data, pBuf->assistBuf, *dst); return data; } -static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing - if (!pResultBuf->comp) { +static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pBuf) { // do nothing + if (!pBuf->comp) { *dst = srcSize; return data; } - *dst = tsDecompressString(data, srcSize, 1, pResultBuf->assistBuf, pResultBuf->pageSize, ONE_STAGE_COMP, NULL, 0); + *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0); if (*dst > 0) { - memcpy(data, pResultBuf->assistBuf, *dst); + memcpy(data, pBuf->assistBuf, *dst); } return data; } -static uint64_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { - if (pResultBuf->pFree == NULL) { - return pResultBuf->nextPos; +static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { + if (pBuf->pFree == NULL) { + return pBuf->nextPos; } else { int32_t offset = -1; - size_t num = taosArrayGetSize(pResultBuf->pFree); + size_t num = taosArrayGetSize(pBuf->pFree); for(int32_t i = 0; i < num; ++i) { - SFreeListItem* pi = taosArrayGet(pResultBuf->pFree, i); + SFreeListItem* pi = taosArrayGet(pBuf->pFree, i); if (pi->len >= size) { offset = pi->offset; pi->offset += (int32_t)size; @@ -151,128 +157,141 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { } // no available recycle space, allocate new area in file - return pResultBuf->nextPos; + return pBuf->nextPos; } } -static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { +static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { assert(!pg->used && pg->pData != NULL); int32_t size = -1; - char* t = doCompressData(GET_DATA_PAYLOAD(pg), pResultBuf->pageSize, &size, pResultBuf); + char* t = doCompressData(GET_DATA_PAYLOAD(pg), pBuf->pageSize, &size, pBuf); // this page is flushed to disk for the first time if (pg->offset == -1) { - pg->offset = allocatePositionInFile(pResultBuf, size); - pResultBuf->nextPos += size; + assert(pg->dirty == true); + + pg->offset = allocatePositionInFile(pBuf, size); + pBuf->nextPos += size; - int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET); + int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); if (ret != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - ret = (int32_t) fwrite(t, 1, size, pResultBuf->file); + ret = (int32_t) fwrite(t, 1, size, pBuf->file); if (ret != size) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - if (pResultBuf->fileSize < pg->offset + size) { - pResultBuf->fileSize = pg->offset + size; + if (pBuf->fileSize < pg->offset + size) { + pBuf->fileSize = pg->offset + size; } - } else { + + pBuf->statis.flushBytes += size; + pBuf->statis.flushPages += 1; + } else if (pg->dirty) { // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing if (pg->length < size) { // 1. add current space to free list SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset}; - taosArrayPush(pResultBuf->pFree, &dinfo); + taosArrayPush(pBuf->pFree, &dinfo); // 2. allocate new position, and update the info - pg->offset = allocatePositionInFile(pResultBuf, size); - pResultBuf->nextPos += size; + pg->offset = allocatePositionInFile(pBuf, size); + pBuf->nextPos += size; } - //3. write to disk. - int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET); + // 3. write to disk. + int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); if (ret != 0) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - ret = (int32_t)fwrite(t, 1, size, pResultBuf->file); + ret = (int32_t)fwrite(t, 1, size, pBuf->file); if (ret != size) { terrno = TAOS_SYSTEM_ERROR(errno); return NULL; } - if (pResultBuf->fileSize < pg->offset + size) { - pResultBuf->fileSize = pg->offset + size; + if (pBuf->fileSize < pg->offset + size) { + pBuf->fileSize = pg->offset + size; } + + pBuf->statis.flushBytes += size; + pBuf->statis.flushPages += 1; } - char* ret = pg->pData; - memset(ret, 0, pResultBuf->pageSize); + char* pDataBuf = pg->pData; + memset(pDataBuf, 0, pBuf->pageSize); - pg->pData = NULL; + pg->pData = NULL; // this means the data is not in buffer pg->length = size; + pg->dirty = false; - pResultBuf->statis.flushBytes += pg->length; - return ret; + return pDataBuf; } -static char* flushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { +static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; - assert(((int64_t) pResultBuf->numOfPages * pResultBuf->pageSize) == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); + assert(((int64_t) pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); - if (pResultBuf->file == NULL) { - if ((ret = createDiskFile(pResultBuf)) != TSDB_CODE_SUCCESS) { + if (pBuf->file == NULL) { + if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { terrno = ret; return NULL; } } - return doFlushPageToDisk(pResultBuf, pg); + return doFlushPageToDisk(pBuf, pg); } // load file block data in disk -static char* loadPageFromDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { - int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET); - ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->length, pResultBuf->file); +static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); + if (ret != 0) { + ret = TAOS_SYSTEM_ERROR(errno); + return ret; + } + + ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->length, pBuf->file); if (ret != pg->length) { - terrno = errno; - return NULL; + ret = TAOS_SYSTEM_ERROR(errno); + return ret; } - pResultBuf->statis.loadBytes += pg->length; + pBuf->statis.loadBytes += pg->length; + pBuf->statis.loadPages += 1; int32_t fullSize = 0; - doDecompressData(GET_DATA_PAYLOAD(pg), pg->length, &fullSize, pResultBuf); - - return (char*)GET_DATA_PAYLOAD(pg); + doDecompressData(GET_DATA_PAYLOAD(pg), pg->length, &fullSize, pBuf); + return 0; } -static SIDList addNewGroup(SDiskbasedBuf* pResultBuf, int32_t groupId) { - assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); +static SIDList addNewGroup(SDiskbasedBuf* pBuf, int32_t groupId) { + assert(taosHashGet(pBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); SArray* pa = taosArrayInit(1, POINTER_BYTES); - int32_t ret = taosHashPut(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); + int32_t ret = taosHashPut(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t), &pa, POINTER_BYTES); assert(ret == 0); return pa; } -static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t pageId) { +static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pageId) { SIDList list = NULL; - char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); + char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); if (p == NULL) { // it is a new group id - list = addNewGroup(pResultBuf, groupId); + list = addNewGroup(pBuf, groupId); } else { list = (SIDList) (*p); } - pResultBuf->numOfPages += 1; + pBuf->numOfPages += 1; SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL}; @@ -286,9 +305,9 @@ static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32 return *(SPageInfo**) taosArrayPush(list, &ppi); } -static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) { +static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListIter iter = {0}; - tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); + tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD); SListNode* pn = NULL; while((pn = tdListNext(&iter)) != NULL) { @@ -305,23 +324,22 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) { return pn; } -static char* evacOneDataPage(SDiskbasedBuf* pResultBuf) { +static char* evacOneDataPage(SDiskbasedBuf* pBuf) { char* bufPage = NULL; - SListNode* pn = getEldestUnrefedPage(pResultBuf); + SListNode* pn = getEldestUnrefedPage(pBuf); // all pages are referenced by user, try to allocate new space if (pn == NULL) { assert(0); - int32_t prev = pResultBuf->inMemPages; + int32_t prev = pBuf->inMemPages; // increase by 50% of previous mem pages - pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f); + pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); -// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev, -// pResultBuf->inMemPages, pResultBuf->pageSize); +// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, +// pBuf->inMemPages, pBuf->pageSize); } else { - pResultBuf->statis.flushPages += 1; - tdListPopNode(pResultBuf->lruList, pn); + tdListPopNode(pBuf->lruList, pn); SPageInfo* d = *(SPageInfo**) pn->data; assert(d->pn == pn); @@ -329,7 +347,7 @@ static char* evacOneDataPage(SDiskbasedBuf* pResultBuf) { d->pn = NULL; tfree(pn); - bufPage = flushPageToDisk(pResultBuf, d); + bufPage = flushPageToDisk(pBuf, d); } return bufPage; @@ -350,12 +368,12 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage); } -SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId) { - pResultBuf->statis.getPages += 1; +SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { + pBuf->statis.getPages += 1; char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { - availablePage = evacOneDataPage(pResultBuf); + if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { + availablePage = evacOneDataPage(pBuf); // Failed to allocate a new buffer page, and there is an error occurs. if (availablePage == NULL) { @@ -364,26 +382,26 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa } // register new id in this group - *pageId = (++pResultBuf->allocateId); + *pageId = (++pBuf->allocateId); // register page id info - SPageInfo* pi = registerPage(pResultBuf, groupId, *pageId); + SPageInfo* pi = registerPage(pBuf, groupId, *pageId); // add to LRU list - assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages && pResultBuf->inMemPages > 0); - lruListPushFront(pResultBuf->lruList, pi); + assert(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); + lruListPushFront(pBuf->lruList, pi); // add to hash map - taosHashPut(pResultBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); + taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); // allocate buf if (availablePage == NULL) { - pi->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize)); // add extract bytes in case of zipped buffer increased. + pi->pData = calloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. } else { pi->pData = availablePage; } - pResultBuf->totalBufSize += pResultBuf->pageSize; + pBuf->totalBufSize += pBuf->pageSize; ((void**)pi->pData)[0] = pi; pi->used = true; @@ -391,16 +409,16 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa return (void *)(GET_DATA_PAYLOAD(pi)); } -SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { - assert(pResultBuf != NULL && id >= 0); - pResultBuf->statis.getPages += 1; +SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { + assert(pBuf != NULL && id >= 0); + pBuf->statis.getPages += 1; - SPageInfo** pi = taosHashGet(pResultBuf->all, &id, sizeof(int32_t)); + SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); assert(pi != NULL && *pi != NULL); if ((*pi)->pData != NULL) { // it is in memory // no need to update the LRU list if only one page exists - if (pResultBuf->numOfPages == 1) { + if (pBuf->numOfPages == 1) { (*pi)->used = true; return (void *)(GET_DATA_PAYLOAD(*pi)); } @@ -408,7 +426,7 @@ SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { SPageInfo** pInfo = (SPageInfo**) ((*pi)->pn->data); assert(*pInfo == *pi); - lruListMoveToFront(pResultBuf->lruList, (*pi)); + lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; return (void *)(GET_DATA_PAYLOAD(*pi)); @@ -417,79 +435,93 @@ SFilePage* getBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0); char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { - availablePage = evacOneDataPage(pResultBuf); + if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { + availablePage = evacOneDataPage(pBuf); if (availablePage == NULL) { return NULL; } } if (availablePage == NULL) { - (*pi)->pData = calloc(1, getAllocPageSize(pResultBuf->pageSize)); + (*pi)->pData = calloc(1, getAllocPageSize(pBuf->pageSize)); } else { (*pi)->pData = availablePage; } ((void**)((*pi)->pData))[0] = (*pi); - lruListPushFront(pResultBuf->lruList, *pi); + lruListPushFront(pBuf->lruList, *pi); (*pi)->used = true; - loadPageFromDisk(pResultBuf, *pi); + int32_t code = loadPageFromDisk(pBuf, *pi); + if (code != 0) { + return NULL; + } + return (void *)(GET_DATA_PAYLOAD(*pi)); } } -void releaseBufPage(SDiskbasedBuf* pResultBuf, void* page) { - assert(pResultBuf != NULL && page != NULL); - char* p = (char*) page - POINTER_BYTES; +void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { + assert(pBuf != NULL && page != NULL); + int32_t offset = offsetof(SPageInfo, pData); + char* p = page - offset; SPageInfo* ppi = ((SPageInfo**) p)[0]; - releaseBufPageInfo(pResultBuf, ppi); + releaseBufPageInfo(pBuf, ppi); } -void releaseBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { +void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { assert(pi->pData != NULL && pi->used); pi->used = false; - pResultBuf->statis.releasePages += 1; + pBuf->statis.releasePages += 1; } -size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } +size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf) { return taosHashGetSize(pBuf->groupSet); } -size_t getTotalBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } +size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } -SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) { - assert(pResultBuf != NULL); +SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId) { + assert(pBuf != NULL); - char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); + char** p = taosHashGet(pBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); if (p == NULL) { // it is a new group id - return pResultBuf->emptyDummyIdList; + return pBuf->emptyDummyIdList; } else { return (SArray*) (*p); } } -void destroyResultBuf(SDiskbasedBuf* pResultBuf) { - if (pResultBuf == NULL) { +void destroyResultBuf(SDiskbasedBuf* pBuf) { + if (pBuf == NULL) { return; } - if (pResultBuf->file != NULL) { -// qDebug("QInfo:0x%"PRIx64" res output buffer closed, total:%.2f Kb, inmem size:%.2f Kb, file size:%.2f Kb", -// pResultBuf->qId, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize / 1024.0, -// pResultBuf->fileSize/1024.0); + printStatisData(pBuf); + + if (pBuf->file != NULL) { + uDebug("Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f Kb, %"PRIx64"\n", + pBuf->totalBufSize/1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, + listNEles(pBuf->lruList), pBuf->fileSize/1024.0, pBuf->pageSize/1024.0f, pBuf->qId); - fclose(pResultBuf->file); + fclose(pBuf->file); } else { -// qDebug("QInfo:0x%"PRIx64" res output buffer closed, total:%.2f Kb, no file created", pResultBuf->qId, -// pResultBuf->totalBufSize/1024.0); + uDebug("Paged buffer closed, total:%.2f Kb, no file created, %"PRIx64, pBuf->totalBufSize/1024.0, pBuf->qId); } - remove(pResultBuf->path); - tfree(pResultBuf->path); + // print the statistics information + { + SDiskbasedBufStatis *ps = &pBuf->statis; + uDebug("Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n" + , ps->getPages, ps->releasePages, ps->flushBytes/1024.0f, ps->flushPages, ps->loadBytes/1024.0f, ps->loadPages + , ps->loadBytes/(1024.0 * ps->loadPages)); + } - SArray** p = taosHashIterate(pResultBuf->groupSet, NULL); + remove(pBuf->path); + tfree(pBuf->path); + + SArray** p = taosHashIterate(pBuf->groupSet, NULL); while(p) { size_t n = taosArrayGetSize(*p); for(int32_t i = 0; i < n; ++i) { @@ -499,16 +531,16 @@ void destroyResultBuf(SDiskbasedBuf* pResultBuf) { } taosArrayDestroy(*p); - p = taosHashIterate(pResultBuf->groupSet, p); + p = taosHashIterate(pBuf->groupSet, p); } - tdListFree(pResultBuf->lruList); - taosArrayDestroy(pResultBuf->emptyDummyIdList); - taosHashCleanup(pResultBuf->groupSet); - taosHashCleanup(pResultBuf->all); + tdListFree(pBuf->lruList); + taosArrayDestroy(pBuf->emptyDummyIdList); + taosHashCleanup(pBuf->groupSet); + taosHashCleanup(pBuf->all); - tfree(pResultBuf->assistBuf); - tfree(pResultBuf); + tfree(pBuf->assistBuf); + tfree(pBuf); } SPageInfo* getLastPageInfo(SIDList pList) { @@ -522,14 +554,41 @@ int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; } -int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) { - return pResultBuf->pageSize; +int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { + return pBuf->pageSize; +} + +bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { + return pBuf->fileSize == 0; +} + +void setBufPageDirty(SFilePage* pPage, bool dirty) { + int32_t offset = offsetof(SPageInfo, pData); // todo extract method + char* p = (char*)pPage - offset; + + SPageInfo* ppi = ((SPageInfo**) p)[0]; + ppi->dirty = dirty; } -bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) { - return pResultBuf->fileSize == 0; +void printStatisBeforeClose(SDiskbasedBuf* pBuf) { + pBuf->printStatis = true; } -void setBufPageDirty(SPageInfo* pPageInfo, bool dirty) { - pPageInfo->dirty = dirty; +void printStatisData(const SDiskbasedBuf* pBuf) { + if (!pBuf->printStatis) { + return; + } + + const SDiskbasedBufStatis* ps = &pBuf->statis; + + printf( + "Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f " + "Kb, %" PRIx64 "\n", + pBuf->totalBufSize / 1024.0, pBuf->numOfPages, listNEles(pBuf->lruList) * pBuf->pageSize / 1024.0, + listNEles(pBuf->lruList), pBuf->fileSize / 1024.0, pBuf->pageSize / 1024.0f, pBuf->qId); + + printf( + "Get/Release pages:%d/%d, flushToDisk:%.2f Kb (%d Pages), loadFromDisk:%.2f Kb (%d Pages), avgPageSize:%.2f Kb\n", + ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, + ps->loadBytes / (1024.0 * ps->loadPages)); }