From 21d57a36240c506b47313911375b4e82c906c354 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 22 Jan 2023 01:45:29 +0800 Subject: [PATCH] fix(query): check for failure during add new buf pages. --- source/libs/executor/src/groupoperator.c | 10 ++- source/util/src/tpagedbuf.c | 98 ++++++++++++------------ 2 files changed, 57 insertions(+), 51 deletions(-) diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5676e19cdf..bf4b9a2599 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); - taosArrayPush(p->pPageList, &pageId); + if (pPage == NULL) { + return pPage; + } + taosArrayPush(p->pPageList, &pageId); *(int32_t*)pPage = 0; } else { int32_t* curId = taosArrayGetLast(p->pPageList); @@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); + if (pPage == NULL) { + qError("failed to get new buffer, code:%s", tstrerror(terrno)); + return NULL; + } + taosArrayPush(p->pPageList, &pageId); memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 87b44b2d13..f696a02e6e 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -222,7 +222,6 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { char* p = doFlushPageToDisk(pBuf, pg); setPageNotInBuf(pg); pg->dirty = false; - return p; } @@ -286,32 +285,21 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { } static char* evacOneDataPage(SDiskbasedBuf* pBuf) { - char* bufPage = NULL; SListNode* pn = getEldestUnrefedPage(pBuf); - terrno = 0; - - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pBuf->inMemPages; - - // increase by 50% of previous mem pages - pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); - - // qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, - // pBuf->inMemPages, pBuf->pageSize); - } else { - tdListPopNode(pBuf->lruList, pn); + if (pn == NULL) { // no available buffer pages now, return. + return NULL; + } - SPageInfo* d = *(SPageInfo**)pn->data; - ASSERTS(d->pn == pn, "d->pn not equal pn"); + terrno = 0; + tdListPopNode(pBuf->lruList, pn); - d->pn = NULL; - taosMemoryFreeClear(pn); + SPageInfo* d = *(SPageInfo**)pn->data; + ASSERTS(d->pn == pn, "d->pn not equal pn"); - bufPage = flushPageToDisk(pBuf, d); - } + d->pn = NULL; + taosMemoryFreeClear(pn); - return bufPage; + return flushPageToDisk(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -338,7 +326,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem SDiskbasedBuf* pPBuf = *pBuf; if (pPBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + goto _error; } pPBuf->pageSize = pagesize; @@ -362,24 +350,50 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES + if (pPBuf->assistBuf == NULL) { + goto _error; + } + pPBuf->all = taosHashInit(10, fn, true, false); - pPBuf->prefix = (char*) dir; + if (pPBuf->all == NULL) { + goto _error; + } + pPBuf->prefix = (char*) dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, - // pPBuf->pageSize, - // pPBuf->inMemPages, pPBuf->path); + // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); return TSDB_CODE_SUCCESS; + _error: + destroyDiskbasedBuf(pPBuf); + return TSDB_CODE_OUT_OF_MEMORY; } -void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { - pBuf->statis.getPages += 1; - +static char* doExtractPage(SDiskbasedBuf* pBuf) { char* availablePage = NULL; if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { availablePage = evacOneDataPage(pBuf); + if (availablePage == NULL) { + uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) + } + } else { + availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + } + + return availablePage; +} + +void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { + pBuf->statis.getPages += 1; + + char* availablePage = doExtractPage(pBuf); + if (availablePage == NULL) { + return NULL; } SPageInfo* pi = NULL; @@ -402,16 +416,8 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { } // add to LRU list - ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, pi); - - // allocate buf - if (availablePage == NULL) { - pi->pData = - taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. - } else { - pi->pData = availablePage; - } + pi->pData = availablePage; ((void**)pi->pData)[0] = pi; #ifdef BUF_PAGE_DEBUG @@ -447,18 +453,9 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); - if (availablePage == NULL) { - return NULL; - } - } - - if (availablePage == NULL) { - (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); - } else { - (*pi)->pData = availablePage; + (*pi)->pData = doExtractPage(pBuf); + if ((*pi)->pData == NULL) { + return NULL; } // set the ptr to the new SPageInfo @@ -471,6 +468,7 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { if ((*pi)->length > 0 && (*pi)->offset >= 0) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { + terrno = code; return NULL; } } -- GitLab