提交 c5d33d31 编写于 作者: H Haojun Liao

[td-11818] fix bug in paged file.

上级 8b4b14f5
...@@ -5718,6 +5718,10 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) { ...@@ -5718,6 +5718,10 @@ void addToDiskbasedBuf(SOrderOperatorInfo* pInfo, jmp_buf env) {
int32_t pageId = -1; int32_t pageId = -1;
SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId);
if (pPage == NULL) {
assert(0);
longjmp(env, terrno);
}
int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t); int32_t size = blockDataGetSize(p) + sizeof(int32_t) + p->info.numOfCols * sizeof(int32_t);
assert(size <= getBufPageSize(pInfo->pSortInternalBuf)); assert(size <= getBufPageSize(pInfo->pSortInternalBuf));
...@@ -5884,7 +5888,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI ...@@ -5884,7 +5888,7 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprI
return NULL; return NULL;
} }
pInfo->sortBufSize = 1024 * 1024 * 5; // 1MB pInfo->sortBufSize = 1024 * 1024 * 50; // 1MB
pInfo->capacity = 64*1024; pInfo->capacity = 64*1024;
pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity); pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, pInfo->capacity);
pInfo->pSources = taosArrayInit(4, POINTER_BYTES); pInfo->pSources = taosArrayInit(4, POINTER_BYTES);
......
...@@ -262,7 +262,7 @@ TEST(testCase, external_sort_Test) { ...@@ -262,7 +262,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1); taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(50000), pExprInfo, pOrderVal); SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(500000), pExprInfo, pOrderVal);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
......
...@@ -13,8 +13,8 @@ typedef struct SFreeListItem { ...@@ -13,8 +13,8 @@ typedef struct SFreeListItem {
} SFreeListItem; } SFreeListItem;
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
int32_t offset; uint64_t offset;
int32_t length; int32_t length;
} SPageDiskInfo; } SPageDiskInfo;
typedef struct SPageInfo { typedef struct SPageInfo {
...@@ -36,7 +36,7 @@ typedef struct SDiskbasedBufStatis { ...@@ -36,7 +36,7 @@ typedef struct SDiskbasedBufStatis {
typedef struct SDiskbasedBuf { typedef struct SDiskbasedBuf {
int32_t numOfPages; int32_t numOfPages;
int64_t totalBufSize; int64_t totalBufSize;
int64_t fileSize; // disk file size uint64_t fileSize; // disk file size
FILE* file; FILE* file;
int32_t allocateId; // allocated page id int32_t allocateId; // allocated page id
char* path; // file path char* path; // file path
...@@ -49,7 +49,7 @@ typedef struct SDiskbasedBuf { ...@@ -49,7 +49,7 @@ typedef struct SDiskbasedBuf {
void* assistBuf; // assistant buffer for compress/decompress data void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position uint64_t nextPos; // next page flush position
uint64_t qId; // for debug purpose uint64_t qId; // for debug purpose
SDiskbasedBufStatis statis; SDiskbasedBufStatis statis;
...@@ -130,7 +130,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba ...@@ -130,7 +130,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba
return data; return data;
} }
static int32_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { static uint64_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) {
if (pResultBuf->pFree == NULL) { if (pResultBuf->pFree == NULL) {
return pResultBuf->nextPos; return pResultBuf->nextPos;
} else { } else {
...@@ -165,10 +165,16 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -165,10 +165,16 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
pResultBuf->nextPos += size; pResultBuf->nextPos += size;
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
assert(ret == 0); if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
ret = (int32_t) fwrite(t, 1, size, pResultBuf->file); ret = (int32_t) fwrite(t, 1, size, pResultBuf->file);
assert(ret == size); if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { if (pResultBuf->fileSize < pg->info.offset + pg->info.length) {
pResultBuf->fileSize = pg->info.offset + pg->info.length; pResultBuf->fileSize = pg->info.offset + pg->info.length;
...@@ -186,13 +192,15 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -186,13 +192,15 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
//3. write to disk. //3. write to disk.
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET);
if (ret != 0) { // todo handle the error case if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
} }
ret = (int32_t)fwrite(t, size, 1, pResultBuf->file); ret = (int32_t)fwrite(t, size, 1, pResultBuf->file);
if (ret != size) { // todo handle the error case if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
} }
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { if (pResultBuf->fileSize < pg->info.offset + pg->info.length) {
...@@ -207,7 +215,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -207,7 +215,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
pg->info.length = size; pg->info.length = size;
pResultBuf->statis.flushBytes += pg->info.length; pResultBuf->statis.flushBytes += pg->info.length;
return ret; return ret;
} }
...@@ -346,6 +353,11 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa ...@@ -346,6 +353,11 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa
availablePage = evicOneDataPage(pResultBuf); availablePage = evicOneDataPage(pResultBuf);
} }
// Failed to allocate a new buffer page, and there is an error occurs.
if (availablePage == NULL && terrno != 0) {
return NULL;
}
// register new id in this group // register new id in this group
*pageId = (++pResultBuf->allocateId); *pageId = (++pResultBuf->allocateId);
...@@ -354,7 +366,6 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa ...@@ -354,7 +366,6 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa
// add to LRU list // add to LRU list
assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages && pResultBuf->inMemPages > 0); assert(listNEles(pResultBuf->lruList) < pResultBuf->inMemPages && pResultBuf->inMemPages > 0);
lruListPushFront(pResultBuf->lruList, pi); lruListPushFront(pResultBuf->lruList, pi);
// add to hash map // add to hash map
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册