未验证 提交 c413e941 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19838 from taosdata/fix/outOfDisk

fix: fix return invalid err
...@@ -5,11 +5,11 @@ ...@@ -5,11 +5,11 @@
#include "thash.h" #include "thash.h"
#include "tlog.h" #include "tlog.h"
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) #define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL) #define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL) #define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0) #define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SPageDiskInfo { typedef struct SPageDiskInfo {
int64_t offset; int64_t offset;
...@@ -17,7 +17,7 @@ typedef struct SPageDiskInfo { ...@@ -17,7 +17,7 @@ typedef struct SPageDiskInfo {
} SPageDiskInfo, SFreeListItem; } SPageDiskInfo, SFreeListItem;
struct SPageInfo { struct SPageInfo {
SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void* pData; void* pData;
int64_t offset; int64_t offset;
int32_t pageId; int32_t pageId;
...@@ -52,10 +52,13 @@ struct SDiskbasedBuf { ...@@ -52,10 +52,13 @@ struct SDiskbasedBuf {
}; };
static int32_t createDiskFile(SDiskbasedBuf* pBuf) { static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
if (pBuf->path == NULL) { // prepare the file name when needed it if (pBuf->path == NULL) { // prepare the file name when needed it
char path[PATH_MAX] = {0}; char path[PATH_MAX] = {0};
taosGetTmpfilePath(pBuf->prefix, "paged-buf", path); taosGetTmpfilePath(pBuf->prefix, "paged-buf", path);
pBuf->path = taosMemoryStrDup(path); pBuf->path = taosMemoryStrDup(path);
if (pBuf->path == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} }
pBuf->pFile = pBuf->pFile =
...@@ -166,6 +169,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -166,6 +169,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
t = doCompressData(payload, pBuf->pageSize, &size, pBuf); t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
if (size < 0) { if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id); uError("failed to compress data when flushing data to disk, %s", pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
} }
...@@ -233,7 +237,7 @@ static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { ...@@ -233,7 +237,7 @@ static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
// load file block data in disk // load file block data in disk
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->offset < 0 || pg->length <= 0) { if (pg->offset < 0 || pg->length <= 0) {
uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id); uError("failed to load buf page from disk, offset:%" PRId64 ", length:%d, %s", pg->offset, pg->length, pBuf->id);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
...@@ -300,6 +304,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { ...@@ -300,6 +304,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
static char* evictBufPage(SDiskbasedBuf* pBuf) { static char* evictBufPage(SDiskbasedBuf* pBuf) {
SListNode* pn = getEldestUnrefedPage(pBuf); SListNode* pn = getEldestUnrefedPage(pBuf);
if (pn == NULL) { // no available buffer pages now, return. if (pn == NULL) { // no available buffer pages now, return.
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
...@@ -379,14 +384,14 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem ...@@ -379,14 +384,14 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
goto _error; goto _error;
} }
pPBuf->prefix = (char*) dir; pPBuf->prefix = (char*)dir;
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
destroyDiskbasedBuf(pPBuf); destroyDiskbasedBuf(pPBuf);
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -396,11 +401,12 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) { ...@@ -396,11 +401,12 @@ static char* doExtractPage(SDiskbasedBuf* pBuf) {
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evictBufPage(pBuf); availablePage = evictBufPage(pBuf);
if (availablePage == NULL) { if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; uWarn("no available buf pages, current:%d, max:%d, reason: %s, %s", listNEles(pBuf->lruList), pBuf->inMemPages,
uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) terrstr(), pBuf->id)
} }
} else { } else {
availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. availablePage =
taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
if (availablePage == NULL) { if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
} }
...@@ -548,9 +554,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { ...@@ -548,9 +554,7 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { return pBuf->pIdList; }
return pBuf->pIdList;
}
void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
if (pBuf == NULL) { if (pBuf == NULL) {
...@@ -625,9 +629,7 @@ SPageInfo* getLastPageInfo(SArray* pList) { ...@@ -625,9 +629,7 @@ SPageInfo* getLastPageInfo(SArray* pList) {
return pPgInfo; return pPgInfo;
} }
int32_t getPageId(const SPageInfo* pPgInfo) { int32_t getPageId(const SPageInfo* pPgInfo) { return pPgInfo->pageId; }
return pPgInfo->pageId;
}
int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; } int32_t getBufPageSize(const SDiskbasedBuf* pBuf) { return pBuf->pageSize; }
...@@ -683,7 +685,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ...@@ -683,7 +685,7 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f,
ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages));
} else { } else {
//printf("no page loaded\n"); // printf("no page loaded\n");
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册