提交 a23f5f59 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 370b5136
......@@ -131,6 +131,7 @@ typedef struct SFileBlockDumpInfo {
typedef struct SReaderStatus {
bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo fBlockDumpInfo;
......@@ -138,7 +139,6 @@ typedef struct SReaderStatus {
SBlockData fileBlockData;
SFilesetIter fileIter;
SDataBlockIter blockIter;
bool composedDataBlock; // the returned data block is a composed block or not
} SReaderStatus;
struct STsdbReader {
......@@ -166,7 +166,7 @@ struct STsdbReader {
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader);
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger);
......@@ -513,86 +513,6 @@ _end:
return code;
}
// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList,
// int32_t tWinIdx) {
// STsdbReader* pTsdbReadHandle = queryHandle;
// pTsdbReadHandle->order = pCond->order;
// pTsdbReadHandle->window = pCond->twindows[tWinIdx];
// pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
// pTsdbReadHandle->cur.fid = -1;
// pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
// pTsdbReadHandle->checkFiles = true;
// pTsdbReadHandle->activeIndex = 0; // current active table index
// pTsdbReadHandle->locateStart = false;
// pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows;
// if (ASCENDING_TRAVERSE(pCond->order)) {
// assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey);
// } else {
// assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey);
// }
// // allocate buffer in order to load data blocks from file
// memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg));
// memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES);
// tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo);
// tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo);
// SArray* pTable = NULL;
// // STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb);
// // pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo);
// pTsdbReadHandle->pTableCheckInfo = NULL; // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta,
// // &pTable);
// if (pTsdbReadHandle->pTableCheckInfo == NULL) {
// // tsdbReaderClose(pTsdbReadHandle);
// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
// }
// // pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev);
// // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next);
// }
// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) {
// assert(pHandle != NULL);
// STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle;
// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
// SArray* res = taosArrayInit(size, POINTER_BYTES);
// return res;
// }
// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
// int32_t firstSlot = 0;
// int32_t lastSlot = numOfBlocks - 1;
// int32_t midSlot = firstSlot;
// while (1) {
// numOfBlocks = lastSlot - firstSlot + 1;
// midSlot = (firstSlot + (numOfBlocks >> 1));
// if (numOfBlocks == 1) break;
// if (skey > pBlock[midSlot].maxKey.ts) {
// if (numOfBlocks == 2) break;
// if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break;
// firstSlot = midSlot + 1;
// } else if (skey < pBlock[midSlot].minKey.ts) {
// if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break;
// lastSlot = midSlot - 1;
// } else {
// break; // got the slot
// }
// }
// return midSlot;
// }
static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) {
SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx));
......@@ -861,71 +781,32 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs();
double elapsedTime = 0;
int32_t code = 0;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
ASSERT(pBlockInfo != NULL);
if (pBlockInfo != NULL) {
SBlock* pBlock = getCurrentBlock(pBlockIter);
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
tstrerror(code), pReader->idStr);
goto _error;
}
elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
SBlock* pBlock = getCurrentBlock(pBlockIter);
int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
} else {
#if 0
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
uint64_t uid = pBlockInfo->uid;
SArray* pBlocks = pLastBlockReader->pBlockL;
pLastBlockReader->currentBlockIndex = -1;
// find the correct SBlockL
for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
SBlockL* pBlock = taosArrayGet(pBlocks, i);
if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
pLastBlockReader->currentBlockIndex = i;
break;
}
}
tstrerror(code), pReader->idStr);
return code;
}
// SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
goto _error;
}
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
}
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false;
return TSDB_CODE_SUCCESS;
_error:
return code;
}
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
......@@ -979,10 +860,10 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
}
static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
if (pFBlock != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
}
......@@ -1396,7 +1277,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pMemSchema;
}
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
......@@ -1512,6 +1393,33 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
return TSDB_CODE_SUCCESS;
}
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
bool mergeBlockData) {
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
// merge with block data if ts == key
if (mergeBlockData) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key,
STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -1549,55 +1457,23 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return TSDB_CODE_SUCCESS;
}
} else { // desc order
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge);
if (ts == key) {
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true);
}
} else { // only last block exists
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false);
}
}
static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
......@@ -1611,7 +1487,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
int64_t minKey = 0;//INT64_MAX;
int64_t minKey = 0;
if (ASCENDING_TRAVERSE(pReader->order)) {
minKey = INT64_MAX; // let's find the minimum
if (minKey > k.ts) {
......@@ -1748,8 +1624,8 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader);
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
ASSERT(pRow != NULL && piRow != NULL);
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
......@@ -2024,20 +1900,20 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN;
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
} else {
// imem + file + last block
if (pBlockScanInfo->iiter.hasVal) {
return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader);
}
// mem + file + last block
if (pBlockScanInfo->iter.hasVal) {
return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader);
}
// files data blocks + last block
......@@ -2270,12 +2146,12 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* p
TSDBKEY key = {.ts = TSKEY_INITIAL_VAL};
initMemDataIterator(pScanInfo, pReader);
TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader);
if (pRow != NULL) {
key = TSDBROW_KEY(pRow);
}
pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader);
if (pRow != NULL) {
TSDBKEY k = TSDBROW_KEY(pRow);
if (key.ts > k.ts) {
......@@ -2861,7 +2737,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32
return false;
}
TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) {
if (!pIter->hasVal) {
return NULL;
}
......@@ -2909,7 +2785,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
}
// data exists but not valid
TSDBROW* pRow = getValidRow(pIter, pDelList, pReader);
TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader);
if (pRow == NULL) {
break;
}
......@@ -3033,7 +2909,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS;
}
// todo check if the rows are dropped or not
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) {
while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -3061,7 +2936,7 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr
*freeTSRow = false;
return;
} else { // has next point in mem/imem
pNextRow = getValidRow(pIter, pDelList, pReader);
pNextRow = getValidMemRow(pIter, pDelList, pReader);
if (pNextRow == NULL) {
*pTSRow = current.pTSRow;
*freeTSRow = false;
......@@ -3127,8 +3002,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
bool* freeTSRow) {
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
SArray* pDelList = pBlockScanInfo->delSkyline;
uint64_t uid = pBlockScanInfo->uid;
......
......@@ -23,6 +23,12 @@
#include "tcommon.h"
#include "tpagedbuf.h"
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \
} while (0);
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
......
......@@ -76,12 +76,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) {
#define realloc u_realloc
#endif
#define T_LONG_JMP(_obj, _c) \
do { \
assert((_c) != -1); \
longjmp((_obj), (_c)); \
} while (0);
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
#define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0)
......@@ -92,9 +86,7 @@ static int32_t getExprFunctionId(SExprInfo* pExprInfo) {
return 0;
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes);
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock);
static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock);
static void releaseQueryBuf(size_t numOfTables);
......@@ -278,9 +270,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) {
#ifdef BUF_PAGE_DEBUG
qDebug("page_1");
#endif
SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
releaseBufPage(pResultBuf, pPage);
......@@ -308,7 +297,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// too many time window in query
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH &&
taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
return pResult;
......@@ -434,7 +423,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code;
longjmp(taskInfo->env, code);
T_LONG_JMP(taskInfo->env, code);
}
}
......@@ -1152,7 +1141,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
} else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate
......@@ -1203,7 +1192,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId,
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
......@@ -1495,7 +1484,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
if (TAOS_FAILED(code)) {
releaseBufPage(pBuf, page);
qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -1507,7 +1496,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
......@@ -1581,7 +1570,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock);
if (TAOS_FAILED(code)) {
qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code));
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
} else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) {
// do nothing, todo refactor
......@@ -1736,7 +1725,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
// while (tsdbNextDataBlock(pTsdbReadHandle)) {
// if (isTaskKilled(pRuntimeEnv->qinfo)) {
// longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
// }
//
// tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo);
......@@ -1755,7 +1744,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// }
//
// if (terrno != TSDB_CODE_SUCCESS) {
// longjmp(pRuntimeEnv->env, terrno);
// T_LONG_JMP(pRuntimeEnv->env, terrno);
// }
// }
......@@ -1919,7 +1908,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
//
// // check for error
// if (terrno != TSDB_CODE_SUCCESS) {
// longjmp(pRuntimeEnv->env, terrno);
// T_LONG_JMP(pRuntimeEnv->env, terrno);
// }
//
// return true;
......@@ -2771,7 +2760,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) {
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->status = OP_RES_TO_RETURN;
......@@ -2966,7 +2955,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
......@@ -2974,7 +2963,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SExprSupp* pSup1 = &pAggInfo->scalarExprSup;
code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -2983,7 +2972,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true);
code = doAggregateImpl(pOperator, pSup->pCtx);
if (code != 0) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -4673,27 +4662,6 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
taosMemoryFreeClear(pTaskInfo);
}
static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) {
if (val == NULL) {
setNull(output, type, bytes);
return;
}
if (IS_VAR_DATA_TYPE(type)) {
// Binary data overflows for sort of unknown reasons. Let trim the overflow data
if (varDataTLen(val) > bytes) {
int32_t maxLen = bytes - VARSTR_HEADER_SIZE;
int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val);
memcpy(varDataVal(output), varDataVal(val), len);
varDataSetLen(output, len);
} else {
varDataCopy(output, val);
}
} else {
memcpy(output, val, bytes);
}
}
static int64_t getQuerySupportBufSize(size_t numOfTables) {
size_t s1 = sizeof(STableQueryInfo);
// size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb
......
......@@ -247,7 +247,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
if (!pInfo->isInit) {
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
pInfo->isInit = true;
num++;
......@@ -265,7 +265,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
num++;
recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j);
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
continue;
}
......@@ -273,7 +273,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals);
int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = j - num;
......@@ -291,7 +291,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len,
pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
int32_t rowIndex = pBlock->info.rows - num;
......@@ -350,7 +350,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
int32_t code = getTableScanInfo(pOperator, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
......@@ -360,7 +360,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
......@@ -678,14 +678,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
if (pInfo->scalarSup.pExprInfo != NULL) {
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, pTaskInfo->code);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
}
}
terrno = TSDB_CODE_SUCCESS;
doHashPartition(pOperator, pBlock);
if (terrno != TSDB_CODE_SUCCESS) { // group by json error
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
}
......
......@@ -268,7 +268,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
......@@ -277,7 +277,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
pProjectInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
status = doIngroupLimitOffset(pLimitInfo, pBlock->info.groupId, pInfo->pRes, pOperator);
......@@ -415,7 +415,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
......@@ -424,7 +424,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -434,7 +434,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......
......@@ -250,7 +250,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
if (!allColumnsHaveAgg) {
......@@ -264,7 +264,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
if (pBlock->pBlockAgg == NULL) {
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
if (pBlock->pBlockAgg == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
}
......@@ -374,7 +374,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -495,7 +495,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pTaskInfo)) {
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
......@@ -523,7 +523,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
......@@ -649,7 +649,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
return NULL;
}
}
......@@ -837,7 +837,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo);
......@@ -1259,7 +1259,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
blockDataFreeRes((SSDataBlock*)pBlock);
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -1950,7 +1950,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) {
metaReaderClear(&smr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
......@@ -2153,7 +2153,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL;
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
// number of columns
......@@ -2527,7 +2527,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno),
GET_TASKID(pTaskInfo));
metaReaderClear(&mr);
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
......@@ -2777,7 +2777,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
......@@ -2820,7 +2820,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx);
while (tsdbNextDataBlock(reader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
......@@ -2843,7 +2843,7 @@ static SSDataBlock* getTableDataBlock(void* param) {
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
......@@ -2936,7 +2936,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
return TSDB_CODE_SUCCESS;
......@@ -3006,7 +3006,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
if (!pInfo->hasGroupId) {
......
......@@ -156,7 +156,7 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) {
int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx,
pOperator->exprSupp.numOfExprs, NULL);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pOperator->pTaskInfo->env, code);
}
}
}
......@@ -184,7 +184,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
......@@ -204,7 +204,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
SSDataBlock* pBlock = NULL;
......@@ -388,7 +388,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
taosMemoryFreeClear(ps);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
return TSDB_CODE_SUCCESS;
......@@ -420,7 +420,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
if (!pInfo->hasGroupId) {
......@@ -575,7 +575,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t code = tsortOpen(pInfo->pSortHandle);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, terrno);
T_LONG_JMP(pTaskInfo->env, terrno);
}
pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0;
......@@ -672,7 +672,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes,
......
......@@ -628,7 +628,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
......@@ -952,7 +952,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
......@@ -975,7 +975,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
......@@ -1009,7 +1009,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
......@@ -1185,7 +1185,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
......@@ -1210,7 +1210,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
......@@ -1928,7 +1928,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
// pInfo->numOfRows data belong to the current session window
......@@ -1947,7 +1947,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
......@@ -2335,7 +2335,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
int32_t code = initKeeperInfo(pSliceInfo, pBlock);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// the pDataBlock are always the same one, no need to call this again
......@@ -2783,7 +2783,7 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
longjmp(pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
}
}
......@@ -2937,7 +2937,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (IS_FINAL_OP(pInfo)) {
......@@ -3189,7 +3189,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
for (int32_t i = 0; i < chIndex + 1 - size; i++) {
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info;
pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
......@@ -3732,7 +3732,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
// too many time window in query
int32_t size = taosArrayGetSize(pAggSup->pCurWins);
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
}
if (pWinInfo->pos.pageId == -1) {
......@@ -3884,7 +3884,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
pStDeleted);
code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap);
......@@ -3896,7 +3896,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SWinRes value = {.ts = pCurWin->win.skey, .groupId = groupId};
code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pCurWin->isOutput = true;
}
......@@ -4205,7 +4205,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SOperatorInfo* pChildOp =
createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0);
if (!pChildOp) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
taosArrayPush(pInfo->pChildren, &pChildOp);
}
......@@ -4668,14 +4668,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pOperator);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pCurWin->winInfo.isClosed = false;
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
SWinRes value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId};
code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinRes));
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pCurWin->winInfo.isOutput = true;
}
......@@ -4921,7 +4921,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
int32_t currPos = startPos;
......@@ -4948,7 +4948,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx,
numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
miaInfo->curTs = currWin.skey;
......@@ -5223,7 +5223,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
TSKEY ekey = ascScan ? win.ekey : win.skey;
......@@ -5240,7 +5240,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx,
numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
// window start key interpolation
......@@ -5269,7 +5269,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId,
pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
ekey = ascScan ? nextWin.ekey : nextWin.skey;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册