提交 233079ec 编写于 作者: H Haojun Liao

[td-11818] fix bug in paged buffer.

上级 c5d33d31
...@@ -30,7 +30,6 @@ typedef struct SPageInfo SPageInfo; ...@@ -30,7 +30,6 @@ typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf; typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define DEFAULT_PAGE_SIZE (16384L) #define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage { typedef struct SFilePage {
......
...@@ -5639,6 +5639,8 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa ...@@ -5639,6 +5639,8 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseResBufPage(pInfo->pSortInternalBuf, pPage);
} }
} }
...@@ -5754,6 +5756,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI ...@@ -5754,6 +5756,8 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, const SOrderOperatorI
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
releaseResBufPage(pInfo->pSortInternalBuf, pPage);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -262,7 +262,7 @@ TEST(testCase, external_sort_Test) { ...@@ -262,7 +262,7 @@ TEST(testCase, external_sort_Test) {
exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1");
taosArrayPush(pExprInfo, &exp1); taosArrayPush(pExprInfo, &exp1);
SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(500000), pExprInfo, pOrderVal); SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(100000), pExprInfo, pOrderVal);
bool newgroup = false; bool newgroup = false;
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
......
...@@ -13,16 +13,18 @@ typedef struct SFreeListItem { ...@@ -13,16 +13,18 @@ typedef struct SFreeListItem {
} SFreeListItem; } SFreeListItem;
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
uint64_t offset; int64_t offset;
int32_t length; int32_t length;
} SPageDiskInfo; } SPageDiskInfo;
typedef struct SPageInfo { typedef struct SPageInfo {
SListNode* pn; // point to list node SListNode* pn; // point to list node
void* pData; void* pData;
int64_t offset;
int32_t pageId; int32_t pageId;
SPageDiskInfo info; int32_t length:30;
bool used; // set current page is in used bool used:1; // set current page is in used
bool dirty:1; // set current buffer page is dirty or not
} SPageInfo; } SPageInfo;
typedef struct SDiskbasedBufStatis { typedef struct SDiskbasedBufStatis {
...@@ -160,11 +162,11 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -160,11 +162,11 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
char* t = doCompressData(GET_DATA_PAYLOAD(pg), pResultBuf->pageSize, &size, pResultBuf); char* t = doCompressData(GET_DATA_PAYLOAD(pg), pResultBuf->pageSize, &size, pResultBuf);
// this page is flushed to disk for the first time // this page is flushed to disk for the first time
if (pg->info.offset == -1) { if (pg->offset == -1) {
pg->info.offset = allocatePositionInFile(pResultBuf, size); pg->offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size; pResultBuf->nextPos += size;
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET);
if (ret != 0) { if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
...@@ -176,35 +178,36 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -176,35 +178,36 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
return NULL; return NULL;
} }
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { if (pResultBuf->fileSize < pg->offset + size) {
pResultBuf->fileSize = pg->info.offset + pg->info.length; pResultBuf->fileSize = pg->offset + size;
} }
} else { } else {
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if (pg->info.length < size) { if (pg->length < size) {
// 1. add current space to free list // 1. add current space to free list
taosArrayPush(pResultBuf->pFree, &pg->info); SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
taosArrayPush(pResultBuf->pFree, &dinfo);
// 2. allocate new position, and update the info // 2. allocate new position, and update the info
pg->info.offset = allocatePositionInFile(pResultBuf, size); pg->offset = allocatePositionInFile(pResultBuf, size);
pResultBuf->nextPos += size; pResultBuf->nextPos += size;
} }
//3. write to disk. //3. write to disk.
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET);
if (ret != 0) { if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
ret = (int32_t)fwrite(t, size, 1, pResultBuf->file); ret = (int32_t)fwrite(t, 1, size, pResultBuf->file);
if (ret != size) { if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return NULL; return NULL;
} }
if (pResultBuf->fileSize < pg->info.offset + pg->info.length) { if (pResultBuf->fileSize < pg->offset + size) {
pResultBuf->fileSize = pg->info.offset + pg->info.length; pResultBuf->fileSize = pg->offset + size;
} }
} }
...@@ -212,9 +215,9 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -212,9 +215,9 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
memset(ret, 0, pResultBuf->pageSize); memset(ret, 0, pResultBuf->pageSize);
pg->pData = NULL; pg->pData = NULL;
pg->info.length = size; pg->length = size;
pResultBuf->statis.flushBytes += pg->info.length; pResultBuf->statis.flushBytes += pg->length;
return ret; return ret;
} }
...@@ -234,17 +237,17 @@ static char* flushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { ...@@ -234,17 +237,17 @@ static char* flushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
// load file block data in disk // load file block data in disk
static char* loadPageFromDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { static char* loadPageFromDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) {
int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); int32_t ret = fseek(pResultBuf->file, pg->offset, SEEK_SET);
ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file); ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->length, pResultBuf->file);
if (ret != pg->info.length) { if (ret != pg->length) {
terrno = errno; terrno = errno;
return NULL; return NULL;
} }
pResultBuf->statis.loadBytes += pg->info.length; pResultBuf->statis.loadBytes += pg->length;
int32_t fullSize = 0; int32_t fullSize = 0;
doDecompressData(GET_DATA_PAYLOAD(pg), pg->info.length, &fullSize, pResultBuf); doDecompressData(GET_DATA_PAYLOAD(pg), pg->length, &fullSize, pResultBuf);
return (char*)GET_DATA_PAYLOAD(pg); return (char*)GET_DATA_PAYLOAD(pg);
} }
...@@ -275,7 +278,8 @@ static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32 ...@@ -275,7 +278,8 @@ static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32
ppi->pageId = pageId; ppi->pageId = pageId;
ppi->pData = NULL; ppi->pData = NULL;
ppi->info = PAGE_INFO_INITIALIZER; ppi->offset = -1;
ppi->length = -1;
ppi->used = true; ppi->used = true;
ppi->pn = NULL; ppi->pn = NULL;
...@@ -307,6 +311,7 @@ static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) { ...@@ -307,6 +311,7 @@ static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) {
// all pages are referenced by user, try to allocate new space // all pages are referenced by user, try to allocate new space
if (pn == NULL) { if (pn == NULL) {
assert(0);
int32_t prev = pResultBuf->inMemPages; int32_t prev = pResultBuf->inMemPages;
// increase by 50% of previous mem pages // increase by 50% of previous mem pages
...@@ -351,11 +356,11 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa ...@@ -351,11 +356,11 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pa
char* availablePage = NULL; char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evicOneDataPage(pResultBuf);
}
// Failed to allocate a new buffer page, and there is an error occurs. // Failed to allocate a new buffer page, and there is an error occurs.
if (availablePage == NULL && terrno != 0) { if (availablePage == NULL) {
return NULL; return NULL;
}
} }
// register new id in this group // register new id in this group
...@@ -409,11 +414,14 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { ...@@ -409,11 +414,14 @@ SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) {
return (void *)(GET_DATA_PAYLOAD(*pi)); return (void *)(GET_DATA_PAYLOAD(*pi));
} else { // not in memory } else { // not in memory
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0); assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
char* availablePage = NULL; char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf); availablePage = evicOneDataPage(pResultBuf);
if (availablePage == NULL) {
return NULL;
}
} }
if (availablePage == NULL) { if (availablePage == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册