未验证 提交 49cc5012 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19327 from taosdata/fix/TD-21657

fix: evac page failed issue cause of disk full
...@@ -115,6 +115,10 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i ...@@ -115,6 +115,10 @@ struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t i
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) { static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
if (NULL == bufPage) {
return NULL;
}
if (forUpdate) { if (forUpdate) {
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
} }
......
...@@ -1726,8 +1726,10 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -1726,8 +1726,10 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return w; return w;
} }
w = getResultRowByPos(pBuf, &pResultRowInfo->cur, false)->win; SResultRow* pRow = getResultRowByPos(pBuf, &pResultRowInfo->cur, false);
if (pRow) {
w = pRow->win;
}
// in case of typical time window, we can calculate time window directly. // in case of typical time window, we can calculate time window directly.
if (w.skey > ts || w.ekey < ts) { if (w.skey > ts || w.ekey < ts) {
w = doCalculateTimeWindow(ts, pInterval); w = doCalculateTimeWindow(ts, pInterval);
......
...@@ -150,6 +150,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i ...@@ -150,6 +150,11 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, i
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} else { } else {
pData = getBufPage(pResultBuf, *currentPageId); pData = getBufPage(pResultBuf, *currentPageId);
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return NULL;
}
pageId = *currentPageId; pageId = *currentPageId;
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
...@@ -200,6 +205,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -200,6 +205,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (isIntervalQuery) { if (isIntervalQuery) {
if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. if (p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1, true); pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
} else { } else {
...@@ -208,6 +217,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -208,6 +217,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (p1 != NULL) { if (p1 != NULL) {
// todo // todo
pResult = getResultRowByPos(pResultBuf, p1, true); pResult = getResultRowByPos(pResultBuf, p1, true);
if (NULL == pResult) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
} }
...@@ -216,6 +229,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -216,6 +229,10 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) { if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
SResultRowPosition pos = pResultRowInfo->cur; SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
releaseBufPage(pResultBuf, pPage); releaseBufPage(pResultBuf, pPage);
} }
...@@ -223,6 +240,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -223,6 +240,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
if (pResult == NULL) { if (pResult == NULL) {
ASSERT(pSup->resultRowSize > 0); ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
// add a new result set for a new group // add a new result set for a new group
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
...@@ -260,6 +280,11 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes ...@@ -260,6 +280,11 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
} else { } else {
SPageInfo* pi = getLastPageInfo(list); SPageInfo* pi = getLastPageInfo(list);
pData = getBufPage(pResultBuf, getPageId(pi)); pData = getBufPage(pResultBuf, getPageId(pi));
if (pData == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
pageId = getPageId(pi); pageId = getPageId(pi);
if (pData->num + size > getBufPageSize(pResultBuf)) { if (pData->num + size > getBufPageSize(pResultBuf)) {
...@@ -912,7 +937,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin ...@@ -912,7 +937,7 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
if (pResultRow->pageId == -1) { if (pResultRow->pageId == -1) {
int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize); int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->aggSup.pResultBuf, pAggInfo->binfo.pRes->info.rowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return; T_LONG_JMP(pTaskInfo->env, terrno);
} }
} }
...@@ -993,6 +1018,11 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR ...@@ -993,6 +1018,11 @@ static void doCopyResultToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SR
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset);
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
...@@ -1036,6 +1066,10 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS ...@@ -1036,6 +1066,10 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) { for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
......
...@@ -492,13 +492,17 @@ _error: ...@@ -492,13 +492,17 @@ _error:
static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) { static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
for (int32_t j = 0; j < pBlock->info.rows; ++j) { for (int32_t j = 0; j < pBlock->info.rows; ++j) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
SDataGroupInfo* pGroupInfo = NULL; SDataGroupInfo* pGroupInfo = NULL;
void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len); void* pPage = getCurrentDataGroupInfo(pInfo, &pGroupInfo, len);
if (pPage == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
pGroupInfo->numOfRows += 1; pGroupInfo->numOfRows += 1;
...@@ -595,6 +599,10 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf ...@@ -595,6 +599,10 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
} else { } else {
int32_t* curId = taosArrayGetLast(p->pPageList); int32_t* curId = taosArrayGetLast(p->pPageList);
pPage = getBufPage(pInfo->pBuf, *curId); pPage = getBufPage(pInfo->pBuf, *curId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return pPage;
}
int32_t* rows = (int32_t*)pPage; int32_t* rows = (int32_t*)pPage;
if (*rows >= pInfo->rowCapacity) { if (*rows >= pInfo->rowCapacity) {
...@@ -674,7 +682,8 @@ static int compareDataGroupInfo(const void* group1, const void* group2) { ...@@ -674,7 +682,8 @@ static int compareDataGroupInfo(const void* group1, const void* group2) {
static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
SPartitionOperatorInfo* pInfo = pOperator->info; SPartitionOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SDataGroupInfo* pGroupInfo = SDataGroupInfo* pGroupInfo =
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL; (pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) { if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
...@@ -692,7 +701,11 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { ...@@ -692,7 +701,11 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex); int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
void* page = getBufPage(pInfo->pBuf, *pageId); void* page = getBufPage(pInfo->pBuf, *pageId);
if (page == NULL) {
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, terrno);
}
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity); blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
......
...@@ -170,6 +170,10 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro ...@@ -170,6 +170,10 @@ static SResultRow* getTableGroupOutputBuf(SOperatorInfo* pOperator, uint64_t gro
} }
*pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId); *pPage = getBufPage(pTableScanInfo->base.pdInfo.pAggSup->pResultBuf, p1->pageId);
if (NULL == *pPage) {
return NULL;
}
return (SResultRow*)((char*)(*pPage) + p1->offset); return (SResultRow*)((char*)(*pPage) + p1->offset);
} }
......
...@@ -636,6 +636,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -636,6 +636,10 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
} }
SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false); SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
if (NULL == pr) {
T_LONG_JMP(pTaskInfo->env, terrno);
}
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (pr->closed) { if (pr->closed) {
...@@ -1315,6 +1319,10 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type ...@@ -1315,6 +1319,10 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
if (NULL == pResult) {
return;
}
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
...@@ -1328,6 +1336,9 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, ...@@ -1328,6 +1336,9 @@ static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf,
} }
} }
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
if (NULL == bufPage) {
return;
}
setBufPageDirty(bufPage, true); setBufPageDirty(bufPage, true);
releaseBufPage(pResultBuf, bufPage); releaseBufPage(pResultBuf, bufPage);
} }
...@@ -4114,6 +4125,9 @@ void destroyMAIOperatorInfo(void* param) { ...@@ -4114,6 +4125,9 @@ void destroyMAIOperatorInfo(void* param) {
static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) { static SResultRow* doSetSingleOutputTupleBuf(SResultRowInfo* pResultRowInfo, SAggSupporter* pSup) {
SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize); SResultRow* pResult = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (NULL == pResult) {
return pResult;
}
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
return pResult; return pResult;
} }
......
...@@ -270,6 +270,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32 ...@@ -270,6 +270,10 @@ static int32_t sortComparInit(SMsortComparParam* pParam, SArray* pSources, int32
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId); void* pPage = getBufPage(pHandle->pBuf, *pPgId);
if (NULL == pPage) {
return terrno;
}
code = blockDataFromBuf(pSource->src.pBlock, pPage); code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -337,6 +341,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT ...@@ -337,6 +341,11 @@ static int32_t adjustMergeTreeForNextTuple(SSortSource* pSource, SMultiwayMergeT
int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex); int32_t* pPgId = taosArrayGet(pSource->pageIdList, pSource->pageIndex);
void* pPage = getBufPage(pHandle->pBuf, *pPgId); void* pPage = getBufPage(pHandle->pBuf, *pPgId);
if (pPage == NULL) {
qError("failed to get buffer, code:%s", tstrerror(terrno));
return terrno;
}
int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage); int32_t code = blockDataFromBuf(pSource->src.pBlock, pPage);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册