diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 6b4c8fb574a0a21f989fc3222489bea717a9911e..f2318c9c6a4b55a6df364f0ac8cf00fb375634c3 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5718,6 +5718,10 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { int32_t pageId = -1; SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); + if (pPage == NULL) { + assert(0); + longjmp(env, terrno); + } int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t); assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); @@ -5884,7 +5888,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI return NULL; } - pInfo->sortBufSize = 1024 * 1024 * 5; // 1MB + pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB pInfo->capacity = 64*1024; pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); pInfo->pSources = taosArrayInit(4, POINTER_BYTES); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 2dd26ab534cc0c906ae85a5b99f357c5c88b2d03..3df9910ffbf828531077509183ae06cef4a694a8 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -262,7 +262,7 @@ TEST(testCase, external_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal); + SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(500000), pExprInfo, pOrderVal); bool newgroup = false; SSDataBlock* pRes = NULL; diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index 2d8fdb1fc317bce659e3e729f397ad7acedfb5fa..8e9ee1e0886721f19e7ff24caa32840376b256b5 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -13,8 +13,8 @@ typedef struct SFreeListItem { } SFreeListItem; typedef struct SPageDiskInfo { - int32_t offset; - int32_t length; + uint64_t offset; + int32_t length; } SPageDiskInfo; typedef struct SPageInfo { @@ -36,7 +36,7 @@ typedef struct SDiskbasedBufStatis { typedef struct SDiskbasedBuf { int32_t numOfPages; int64_t totalBufSize; - int64_t fileSize; // disk file size + uint64_t fileSize; // disk file size FILE* file; int32_t allocateId; // allocated page id char* path; // file path @@ -49,7 +49,7 @@ typedef struct SDiskbasedBuf { void* assistBuf; // assistant buffer for compress/decompress data SArray* pFree; // free area in file bool comp; // compressed before flushed to disk - int32_t nextPos; // next page flush position + uint64_t nextPos; // next page flush position uint64_t qId; // for debug purpose SDiskbasedBufStatis statis; @@ -130,7 +130,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba return data; } -static int32_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { +static uint64_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { if (pResultBuf->pFree == NULL) { return pResultBuf->nextPos; } else { @@ -165,10 +165,16 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { pResultBuf->nextPos += size; int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); - assert(ret == 0); + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } ret = (int32_t) fwrite(t, 1, size, pResultBuf->file); - assert(ret == size); + if (ret != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { pResultBuf->fileSize = pg->info.offset + pg->info.length; @@ -186,13 +192,15 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { //3. write to disk. int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); - if (ret != 0) { // todo handle the error case - + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; } ret = (int32_t)fwrite(t, size, 1, pResultBuf->file); - if (ret != size) { // todo handle the error case - + if (ret != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; } if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { @@ -207,7 +215,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { pg->info.length = size; pResultBuf->statis.flushBytes += pg->info.length; - return ret; } @@ -346,6 +353,11 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa availablePage = evicOneDataPage(pResultBuf); } + // Failed to allocate a new buffer page, and there is an error occurs. + if (availablePage == NULL && terrno != 0) { + return NULL; + } + // register new id in this group *pageId = (++pResultBuf->allocateId); @@ -354,7 +366,6 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa // add to LRU list assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages && pResultBuf->inMemPages > 0); - lruListPushFront(pResultBuf->lruList, pi); // add to hash map