提交 003002af 编写于 作者: G Ganlin Zhao

enh(query): handle getBufPage return NULL when no available disk spaces

上级 848b16b8
......@@ -91,6 +91,7 @@ typedef struct STuplePos {
};
STupleKey streamTupleKey;
};
bool isValid;
} STuplePos;
typedef struct SFirstLastRes {
......
......@@ -2014,20 +2014,26 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
ASSERT(pCtx->subsidiaries.rowLen > 0);
}
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
SFirstLastRes* pInfo) {
if (pCtx->subsidiaries.num <= 0) {
return;
return TSDB_CODE_SUCCESS;
}
if (!pInfo->hasResult) {
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
if (!pInfo->pos.isValid) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
} else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
}
return TSDB_CODE_SUCCESS;
}
static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2037,9 +2043,13 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur
memcpy(pInfo->buf, pData, pInfo->bytes);
pInfo->ts = currentTs;
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->hasResult = true;
return TSDB_CODE_SUCCESS;
}
// This ordinary first function does not care if current scan is ascending order or descending order scan
......@@ -2063,7 +2073,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2136,7 +2149,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = pts[i];
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2144,7 +2160,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (numOfElems == 0) {
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
......@@ -2171,7 +2190,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
ASSERT(pInputCol->hasNull == true);
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2261,7 +2283,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
char* data = colDataGetData(pInputCol, chosen);
doSaveCurrentVal(pCtx, i, cts, type, data);
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2269,7 +2294,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2283,7 +2311,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2294,7 +2325,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
// save selectivity value for column consisted of all null values
if (numOfElems == 0) {
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// SET_VAL(pResInfo, numOfElems, 1);
......@@ -2322,12 +2356,17 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
return TSDB_CODE_SUCCESS;
}
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
int32_t rowIndex) {
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pOutput->hasResult = true;
}
return TSDB_CODE_SUCCESS;
}
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
......@@ -2343,7 +2382,10 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!numOfElems) {
numOfElems = pInputInfo->hasResult ? 1 : 0;
}
......@@ -2412,7 +2454,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS;
}
static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
......@@ -2429,9 +2471,14 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i
}
pInfo->ts = cts;
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->hasResult = true;
return TSDB_CODE_SUCCESS;
}
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
......@@ -2493,7 +2540,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2948,18 +2998,27 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
return p;
}
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) {
return p;
}
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
// current page is all used, let's prepare a new buffer page
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
return p;
}
pPage->num = sizeof(SFilePage);
}
}
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num};
p = (STuplePos){.pageId = pHandle->currentPage, .offset = pPage->num, .isValid = true};
memcpy(pPage->data + pPage->num, pBuf, length);
pPage->num += length;
......@@ -5697,7 +5756,10 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册