未验证 提交 ca9f4be1 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #19370 from taosdata/enh/TD-21690

enh(query): handle getBufPage return NULL when no available disk spaces
......@@ -44,9 +44,10 @@ typedef struct SMinmaxResInfo {
bool nullTupleSaved;
int16_t type;
} SMinmaxResInfo;
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc);
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems);
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
......
......@@ -754,24 +754,33 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
}
int32_t minFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 1);
int32_t numOfElems = 0;
int32_t code = doMinMaxHelper(pCtx, 1, &numOfElems);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
int32_t maxFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = doMinMaxHelper(pCtx, 0);
int32_t numOfElems = 0;
int32_t code = doMinMaxHelper(pCtx, 0, &numOfElems);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
static int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos,
int32_t rowIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
......@@ -788,17 +797,17 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
code = setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
}
return pEntryInfo->numOfRes;
return code;
}
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
int32_t setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
if (pCtx->subsidiaries.num <= 0) {
return;
return TSDB_CODE_SUCCESS;
}
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
......@@ -808,17 +817,23 @@ void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppendNULL(pDstCol, rowIndex);
}
return TSDB_CODE_SUCCESS;
}
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
if (pCtx->subsidiaries.num <= 0) {
return;
return TSDB_CODE_SUCCESS;
}
if ((pCtx->saveHandle.pBuf != NULL && pTuplePos->pageId != -1) ||
(pCtx->saveHandle.pState && pTuplePos->streamTupleKey.ts > 0)) {
int32_t numOfCols = pCtx->subsidiaries.num;
const char* p = loadTupleData(pCtx, pTuplePos);
if (p == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
bool* nullList = (bool*)p;
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
......@@ -841,6 +856,8 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
tdbFree((void*)p);
}
}
return TSDB_CODE_SUCCESS;
}
void releaseSource(STuplePos* pPos) {
......@@ -1567,7 +1584,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
// all data are null, set it completed
if (pInfo->numOfElems == 0) {
pResInfo->complete = true;
return 0;
return TSDB_CODE_SUCCESS;
} else {
pInfo->pMemBucket = tMemBucketCreate(pCol->info.bytes, type, pInfo->minval, pInfo->maxval);
}
......@@ -1630,7 +1647,11 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pCol, i);
numOfElems += 1;
tMemBucketPut(pInfo->pMemBucket, data, 1);
int32_t code = tMemBucketPut(pInfo->pMemBucket, data, 1);
if (code != TSDB_CODE_SUCCESS) {
tMemBucketDestroy(pInfo->pMemBucket);
return code;
}
}
SET_VAL(pResInfo, numOfElems, 1);
......@@ -1653,6 +1674,11 @@ int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
}
tMemBucketDestroy(pMemBucket);
if (ppInfo->result < 0) {
return TSDB_CODE_NO_AVAIL_DISK;
}
return functionFinalize(pCtx, pBlock);
}
......@@ -2013,20 +2039,24 @@ static void prepareBuf(SqlFunctionCtx* pCtx) {
}
}
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
static int32_t firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx,
SFirstLastRes* pInfo) {
int32_t code = TSDB_CODE_SUCCESS;
if (pCtx->subsidiaries.num <= 0) {
return;
return TSDB_CODE_SUCCESS;
}
if (!pInfo->hasResult) {
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} else {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
}
return code;
}
static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
static int32_t doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
......@@ -2036,9 +2066,13 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur
memcpy(pInfo->buf, pData, pInfo->bytes);
pInfo->ts = currentTs;
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->hasResult = true;
return TSDB_CODE_SUCCESS;
}
// This ordinary first function does not care if current scan is ascending order or descending order scan
......@@ -2061,7 +2095,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2134,7 +2171,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i);
TSKEY cts = pts[i];
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
int32_t code = doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2142,7 +2182,10 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (numOfElems == 0) {
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS;
......@@ -2168,7 +2211,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
// All null data column, return directly.
if (pInput->colDataSMAIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows) && pInputCol->hasNull == true) {
// save selectivity value for column consisted of all null values
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
return TSDB_CODE_SUCCESS;
}
......@@ -2258,7 +2304,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
char* data = colDataGetData(pInputCol, chosen);
doSaveCurrentVal(pCtx, i, cts, type, data);
int32_t code = doSaveCurrentVal(pCtx, i, cts, type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2266,7 +2315,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
for (int32_t i = pInput->startRowIndex + round * 4; i < pInput->startRowIndex + pInput->numOfRows; ++i) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2280,7 +2332,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
if (pResInfo->numOfRes == 0 || pInfo->ts < pts[i]) {
char* data = colDataGetData(pInputCol, i);
doSaveCurrentVal(pCtx, i, pts[i], type, data);
int32_t code = doSaveCurrentVal(pCtx, i, pts[i], type, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2291,7 +2346,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
// save selectivity value for column consisted of all null values
if (numOfElems == 0) {
firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, pInput->startRowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
// SET_VAL(pResInfo, numOfElems, 1);
......@@ -2319,12 +2377,17 @@ static int32_t firstLastTransferInfoImpl(SFirstLastRes* pInput, SFirstLastRes* p
return TSDB_CODE_SUCCESS;
}
static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
static int32_t firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst,
int32_t rowIndex) {
if (TSDB_CODE_SUCCESS == firstLastTransferInfoImpl(pInput, pOutput, isFirst)) {
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pOutput);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pOutput->hasResult = true;
}
return TSDB_CODE_SUCCESS;
}
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
......@@ -2342,7 +2405,10 @@ static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuer
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SFirstLastRes* pInputInfo = (SFirstLastRes*)varDataVal(data);
firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
int32_t code = firstLastTransferInfo(pCtx, pInputInfo, pInfo, isFirstQuery, i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (!numOfElems) {
numOfElems = pInputInfo->hasResult ? 1 : 0;
}
......@@ -2357,6 +2423,7 @@ int32_t firstFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMerge
int32_t lastFunctionMerge(SqlFunctionCtx* pCtx) { return firstLastFunctionMergeImpl(pCtx, false); }
int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
......@@ -2367,12 +2434,14 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
// handle selectivity
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
return pResInfo->numOfRes;
return code;
}
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
......@@ -2388,10 +2457,10 @@ int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false);
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
code = setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
taosMemoryFree(res);
return 1;
return code;
}
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
......@@ -2411,7 +2480,7 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS;
}
static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
static int32_t doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
......@@ -2428,9 +2497,14 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i
}
pInfo->ts = cts;
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
int32_t code = firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->hasResult = true;
return TSDB_CODE_SUCCESS;
}
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
......@@ -2492,7 +2566,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......@@ -2769,7 +2846,7 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) {
return pRes;
}
static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
static int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery);
static void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery);
......@@ -2792,11 +2869,17 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pRes->nullTupleSaved = true;
}
return TSDB_CODE_SUCCESS;
......@@ -2820,11 +2903,17 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
int32_t code = doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pRes->nullTupleSaved = true;
}
......@@ -2864,7 +2953,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par
return (val1->v.d > val2->v.d) ? 1 : -1;
}
void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
int32_t doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) {
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
......@@ -2881,7 +2970,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple
if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
int32_t code = saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
#ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
......@@ -2907,7 +2999,10 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple by over writing the old data
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
int32_t code = updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
#ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
......@@ -2916,6 +3011,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
topBotResComparFn, NULL, !isTopQuery);
}
}
return TSDB_CODE_SUCCESS;
}
/*
......@@ -2955,20 +3052,33 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
return buf;
}
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key) {
static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key,
STuplePos* pPos) {
STuplePos p = {0};
if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL;
if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
// current page is all used, let's prepare a new buffer page
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
}
}
......@@ -2986,10 +3096,11 @@ static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf
}
}
return p;
*pPos = p;
return TSDB_CODE_SUCCESS;
}
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
prepareBuf(pCtx);
STupleKey key;
......@@ -3005,12 +3116,16 @@ STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos);
}
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
memcpy(pPage->data + pPos->offset, pBuf, length);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
......@@ -3025,13 +3140,15 @@ int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBloc
prepareBuf(pCtx);
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
return TSDB_CODE_SUCCESS;
return doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
}
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
return NULL;
}
char* p = pPage->data + pPos->offset;
releaseBufPage(pHandle->pBuf, pPage);
return p;
......@@ -3048,6 +3165,8 @@ const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) {
}
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = getTopBotOutputInfo(pCtx);
......@@ -3060,8 +3179,8 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows;
if (pEntryInfo->numOfRes <= 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
return pEntryInfo->numOfRes;
code = setSelectivityValue(pCtx, pBlock, &pRes->nullTuplePos, currentRow);
return code;
}
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i];
......@@ -3070,11 +3189,11 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
qDebug("page_finalize i:%d,item:%p,pageId:%d, offset:%d\n", i, pItem, pItem->tuplePos.pageId,
pItem->tuplePos.offset);
#endif
setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
code = setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow);
currentRow += 1;
}
return pEntryInfo->numOfRes;
return code;
}
void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) {
......@@ -4452,12 +4571,15 @@ static void sampleAssignResult(SSampleInfo* pInfo, char* data, int32_t index) {
assignVal(pInfo->data + index * pInfo->colBytes, data, pInfo->colBytes, pInfo->colType);
}
static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) {
static int32_t doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* data, int32_t index) {
pInfo->totalPoints++;
if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) {
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pInfo->numSampled++;
} else {
......@@ -4465,10 +4587,15 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
int32_t code = updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t sampleFunction(SqlFunctionCtx* pCtx) {
......@@ -4484,11 +4611,17 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
}
char* data = colDataGetData(pInputCol, i);
doReservoirSample(pCtx, pInfo, data, i);
int32_t code = doReservoirSample(pCtx, pInfo, data, i);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->nullTupleSaved = true;
}
......@@ -4497,6 +4630,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
}
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SSampleInfo* pInfo = getSampleOutputInfo(pCtx);
......@@ -4508,15 +4642,15 @@ int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t currentRow = pBlock->info.rows;
if (pInfo->numSampled == 0) {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
return pInfo->numSampled;
code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
return code;
}
for (int32_t i = 0; i < pInfo->numSampled; ++i) {
colDataAppend(pCol, currentRow + i, pInfo->data + i * pInfo->colBytes, false);
setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
code = setSelectivityValue(pCtx, pBlock, &pInfo->tuplePos[i], currentRow + i);
}
return pInfo->numSampled;
return code;
}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......@@ -4782,7 +4916,7 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
return true;
}
static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) {
static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) {
int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes;
SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes);
if (pHashItem == NULL) {
......@@ -4792,7 +4926,10 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
pItem->count += 1;
if (pCtx->subsidiaries.num > 0) {
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*));
......@@ -4800,9 +4937,14 @@ static void doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx,
} else {
(*pHashItem)->count += 1;
if (pCtx->subsidiaries.num > 0) {
updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
return TSDB_CODE_SUCCESS;
}
int32_t modeFunction(SqlFunctionCtx* pCtx) {
......@@ -4823,7 +4965,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
numOfElems++;
char* data = colDataGetData(pInputCol, i);
doModeAdd(pInfo, i, pCtx, data);
int32_t code = doModeAdd(pInfo, i, pCtx, data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) {
taosHashCleanup(pInfo->pHash);
......@@ -4832,7 +4977,10 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
}
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pInfo->nullTupleSaved = true;
}
......@@ -4842,6 +4990,7 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) {
}
int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t code = TSDB_CODE_SUCCESS;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
......@@ -4861,15 +5010,15 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (maxCount != 0) {
SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes));
colDataAppend(pCol, currentRow, pResItem->data, false);
setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow);
code = setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow);
} else {
colDataAppendNULL(pCol, currentRow);
setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow);
}
taosHashCleanup(pInfo->pHash);
return pResInfo->numOfRes;
return code;
}
bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......@@ -5713,7 +5862,10 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
doSaveLastrow(pCtx, data, i, cts, pInfo);
int32_t code = doSaveLastrow(pCtx, data, i, cts, pInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pResInfo->numOfRes = 1;
}
}
......
......@@ -700,7 +700,7 @@ static void doExtractVal(SColumnInfoData* pCol, int32_t i, int32_t end, SqlFunct
}
}
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc, int32_t* nElems) {
int32_t numOfElems = 0;
SInputColumnInfoData* pInput = &pCtx->input;
......@@ -745,7 +745,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
} else {
......@@ -759,7 +762,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
......@@ -773,7 +779,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
......@@ -787,7 +796,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
......@@ -803,14 +815,17 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
if (index >= 0) {
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
}
}
pBuf->assign = true;
return numOfElems;
return TSDB_CODE_SUCCESS;
}
int32_t start = pInput->startRowIndex;
......@@ -824,7 +839,10 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
memcpy(&pBuf->v, pCol->pData + (pCol->info.bytes * i), pCol->info.bytes);
if (pCtx->subsidiaries.num > 0) {
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pBuf->assign = true;
numOfElems = 1;
......@@ -887,9 +905,13 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) {
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pBuf->nullTupleSaved = true;
}
return numOfElems;
*nElems = numOfElems;
return TSDB_CODE_SUCCESS;
}
......@@ -40,6 +40,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
int32_t *pageId = taosArrayGet(pIdList, i);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return NULL;
}
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes);
......@@ -104,6 +107,9 @@ double findOnlyResult(tMemBucket *pMemBucket) {
int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) {
return -1;
}
ASSERT(pPage->num == 1);
double v = 0;
......@@ -380,6 +386,9 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
}
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
if (pSlot->info.data == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
}
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
}
......@@ -391,7 +400,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
}
pBucket->total += count;
return 0;
return TSDB_CODE_SUCCESS;
}
////////////////////////////////////////////////////////////////////////////////////////////
......@@ -471,6 +480,9 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
if (pSlot->info.size <= pMemBucket->maxCapacity) {
// data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) {
return -1;
}
int32_t currentIdx = count - num;
char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
......@@ -505,8 +517,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction)
for (int32_t f = 0; f < list->size; ++f) {
int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return -1;
}
tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);
if (code != TSDB_CODE_SUCCESS) {
return -1;
}
setBufPageDirty(pg, true);
releaseBufPage(pMemBucket->pBuffer, pg);
}
......@@ -528,7 +546,9 @@ double getPercentile(tMemBucket *pMemBucket, double percent) {
// if only one elements exists, return it
if (pMemBucket->total == 1) {
return findOnlyResult(pMemBucket);
if (findOnlyResult(pMemBucket) < 0) {
return -1;
}
}
percent = fabs(percent);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册