diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0b027367dabf4ad2afa326dbdf1bf86c0c993876..cd40a9acc21d61a21defd45184d1f955b398d3b5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -131,6 +131,7 @@ typedef struct SFileBlockDumpInfo { typedef struct SReaderStatus { bool loadFromFile; // check file stage + bool composedDataBlock; // the returned data block is a composed block or not SHashObj* pTableMap; // SHash STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. SFileBlockDumpInfo fBlockDumpInfo; @@ -138,7 +139,6 @@ typedef struct SReaderStatus { SBlockData fileBlockData; SFilesetIter fileIter; SDataBlockIter blockIter; - bool composedDataBlock; // the returned data block is a composed block or not } SReaderStatus; struct STsdbReader { @@ -166,7 +166,7 @@ struct STsdbReader { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); -static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); +static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, SRowMerger* pMerger); static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger); @@ -513,86 +513,6 @@ _end: return code; } -// void tsdbResetQueryHandleForNewTable(STsdbReader* queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, -// int32_t tWinIdx) { -// STsdbReader* pTsdbReadHandle = queryHandle; - -// pTsdbReadHandle->order = pCond->order; -// pTsdbReadHandle->window = pCond->twindows[tWinIdx]; -// pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; -// pTsdbReadHandle->cur.fid = -1; -// pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; -// pTsdbReadHandle->checkFiles = true; -// pTsdbReadHandle->activeIndex = 0; // current active table index -// pTsdbReadHandle->locateStart = false; -// pTsdbReadHandle->loadExternalRow = pCond->loadExternalRows; - -// if (ASCENDING_TRAVERSE(pCond->order)) { -// assert(pTsdbReadHandle->window.skey <= pTsdbReadHandle->window.ekey); -// } else { -// assert(pTsdbReadHandle->window.skey >= pTsdbReadHandle->window.ekey); -// } - -// // allocate buffer in order to load data blocks from file -// memset(pTsdbReadHandle->suppInfo.pstatis, 0, sizeof(SColumnDataAgg)); -// memset(pTsdbReadHandle->suppInfo.plist, 0, POINTER_BYTES); - -// tsdbInitDataBlockLoadInfo(&pTsdbReadHandle->dataBlockLoadInfo); -// tsdbInitCompBlockLoadInfo(&pTsdbReadHandle->compBlockLoadInfo); - -// SArray* pTable = NULL; -// // STsdbMeta* pMeta = tsdbGetMeta(pTsdbReadHandle->pTsdb); - -// // pTsdbReadHandle->pTableCheckInfo = destroyTableCheckInfo(pTsdbReadHandle->pTableCheckInfo); - -// pTsdbReadHandle->pTableCheckInfo = NULL; // createDataBlockScanInfo(pTsdbReadHandle, groupList, pMeta, -// // &pTable); -// if (pTsdbReadHandle->pTableCheckInfo == NULL) { -// // tsdbReaderClose(pTsdbReadHandle); -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// } - -// // pTsdbReadHandle->prev = doFreeColumnInfoData(pTsdbReadHandle->prev); -// // pTsdbReadHandle->next = doFreeColumnInfoData(pTsdbReadHandle->next); -// } - -// SArray* tsdbGetQueriedTableList(STsdbReader** pHandle) { -// assert(pHandle != NULL); - -// STsdbReader* pTsdbReadHandle = (STsdbReader*)pHandle; - -// size_t size = taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo); -// SArray* res = taosArrayInit(size, POINTER_BYTES); -// return res; -// } - -// static int32_t binarySearchForBlock(SBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) { -// int32_t firstSlot = 0; -// int32_t lastSlot = numOfBlocks - 1; - -// int32_t midSlot = firstSlot; - -// while (1) { -// numOfBlocks = lastSlot - firstSlot + 1; -// midSlot = (firstSlot + (numOfBlocks >> 1)); - -// if (numOfBlocks == 1) break; - -// if (skey > pBlock[midSlot].maxKey.ts) { -// if (numOfBlocks == 2) break; -// if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].minKey.ts)) break; -// firstSlot = midSlot + 1; -// } else if (skey < pBlock[midSlot].minKey.ts) { -// if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].maxKey.ts)) break; -// lastSlot = midSlot - 1; -// } else { -// break; // got the slot -// } -// } - -// return midSlot; -// } - static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader, SArray* pIndexList) { SArray* aBlockIdx = taosArrayInit(8, sizeof(SBlockIdx)); @@ -861,71 +781,32 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData) { int64_t st = taosGetTimestampUs(); - double elapsedTime = 0; - int32_t code = 0; SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; + ASSERT(pBlockInfo != NULL); - if (pBlockInfo != NULL) { - SBlock* pBlock = getCurrentBlock(pBlockIter); - code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); - if (code != TSDB_CODE_SUCCESS) { - tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, code:%s %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, - tstrerror(code), pReader->idStr); - goto _error; - } - - elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - - tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + SBlock* pBlock = getCurrentBlock(pBlockIter); + int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, code:%s %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, - pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); - } else { -#if 0 - SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - - uint64_t uid = pBlockInfo->uid; - SArray* pBlocks = pLastBlockReader->pBlockL; - - pLastBlockReader->currentBlockIndex = -1; - - // find the correct SBlockL - for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) { - SBlockL* pBlock = taosArrayGet(pBlocks, i); - if (pBlock->minUid >= uid && pBlock->maxUid <= uid) { - pLastBlockReader->currentBlockIndex = i; - break; - } - } + tstrerror(code), pReader->idStr); + return code; + } -// SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index); - code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData); - if (code != TSDB_CODE_SUCCESS) { - tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s", - pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow, - pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr); - goto _error; - } + double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64 - ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow, - pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr); -#endif - } + tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64 + ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, + pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); pReader->cost.blockLoadTime += elapsedTime; pDumpInfo->allDumped = false; return TSDB_CODE_SUCCESS; - -_error: - return code; } static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) { @@ -979,10 +860,10 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v } static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { - SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); - if (pFBlock != NULL) { - STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); - int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx); + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + if (pBlockInfo != NULL) { + STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); + int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock); } @@ -1396,7 +1277,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* return pReader->pMemSchema; } -static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, +static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow, SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; @@ -1512,6 +1393,33 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf return TSDB_CODE_SUCCESS; } +static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, + STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, + bool mergeBlockData) { + SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + + STSRow* pTSRow = NULL; + SRowMerger merge = {0}; + + TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); + + tRowMergerInit(&merge, &fRow, pReader->pSchema); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); + + // merge with block data if ts == key + if (mergeBlockData) { + doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + } + + tRowMergerGetRow(&merge, &pTSRow); + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); + + taosMemoryFree(pTSRow); + tRowMergerClear(&merge); + return TSDB_CODE_SUCCESS; +} + static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -1549,55 +1457,23 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader return TSDB_CODE_SUCCESS; } } else { // desc order - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - tRowMergerInit(&merge, &fRow1, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - - if (ts == key) { - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - } - - tRowMergerGetRow(&merge, &pTSRow); - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, pBlockData, true); } } else { // only last block exists - SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); - - STSRow* pTSRow = NULL; - SRowMerger merge = {0}; - - TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex); - - tRowMergerInit(&merge, &fRow, pReader->pSchema); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge); - tRowMergerGetRow(&merge, &pTSRow); - - doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); - - taosMemoryFree(pTSRow); - tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return doMergeFileBlockAndLastBlock(pLastBlockReader, pReader, pBlockScanInfo, NULL, false); } } -static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) { +static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, + SLastBlockReader* pLastBlockReader) { SRowMerger merge = {0}; STSRow* pTSRow = NULL; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader); + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); ASSERT(pRow != NULL && piRow != NULL); SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; @@ -1611,7 +1487,7 @@ static int32_t doMergeMultiLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - int64_t minKey = 0;//INT64_MAX; + int64_t minKey = 0; if (ASCENDING_TRAVERSE(pReader->order)) { minKey = INT64_MAX; // let's find the minimum if (minKey > k.ts) { @@ -1748,8 +1624,8 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SArray* pDelList = pBlockScanInfo->delSkyline; - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pDelList, pReader); + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); ASSERT(pRow != NULL && piRow != NULL); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; @@ -2024,20 +1900,20 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t key = (pBlockData->nRow > 0)? pBlockData->aTSKEY[pDumpInfo->rowIndex]:INT64_MIN; - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { - return doMergeMultiLevelRowsRv(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); + return doMergeMultiLevelRows(pReader, pBlockScanInfo, pBlockData, pLastBlockReader); } else { // imem + file + last block if (pBlockScanInfo->iiter.hasVal) { - return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, piRow, &pBlockScanInfo->iiter, key, pLastBlockReader); } // mem + file + last block if (pBlockScanInfo->iter.hasVal) { - return doMergeBufAndFileRows_Rv(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); + return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key, pLastBlockReader); } // files data blocks + last block @@ -2270,12 +2146,12 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* p TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; initMemDataIterator(pScanInfo, pReader); - TSDBROW* pRow = getValidRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); + TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { key = TSDBROW_KEY(pRow); } - pRow = getValidRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader); + pRow = getValidMemRow(&pScanInfo->iiter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { TSDBKEY k = TSDBROW_KEY(pRow); if (key.ts > k.ts) { @@ -2861,7 +2737,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32 return false; } -TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) { +TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader) { if (!pIter->hasVal) { return NULL; } @@ -2909,7 +2785,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe } // data exists but not valid - TSDBROW* pRow = getValidRow(pIter, pDelList, pReader); + TSDBROW* pRow = getValidMemRow(pIter, pDelList, pReader); if (pRow == NULL) { break; } @@ -3033,7 +2909,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc return TSDB_CODE_SUCCESS; } -// todo check if the rows are dropped or not int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) { while(nextRowInLastBlock(pLastBlockReader, pScanInfo)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); @@ -3061,7 +2936,7 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr *freeTSRow = false; return; } else { // has next point in mem/imem - pNextRow = getValidRow(pIter, pDelList, pReader); + pNextRow = getValidMemRow(pIter, pDelList, pReader); if (pNextRow == NULL) { *pTSRow = current.pTSRow; *freeTSRow = false; @@ -3127,8 +3002,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, bool* freeTSRow) { - TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); - TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); + TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); SArray* pDelList = pBlockScanInfo->delSkyline; uint64_t uid = pBlockScanInfo->uid; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 336596032551d7380664265d36e4421db4b744f9..a25933d15e9367513bc1ad5d5096972fbf65bc35 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -23,6 +23,12 @@ #include "tcommon.h" #include "tpagedbuf.h" +#define T_LONG_JMP(_obj, _c) \ + do { \ + ASSERT((_c) != -1); \ + longjmp((_obj), (_c)); \ + } while (0); + #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ assert(sizeof(_uid) == sizeof(uint64_t)); \ diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a64a9267101b184abae40709b31fab9e8cc8f427..1b3650ad776a608c91dfb401f08a31d45d70235f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -122,7 +122,7 @@ typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* res typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); -typedef void (*__optr_close_fn_t)(void* param, int32_t num); +typedef void (*__optr_close_fn_t)(void* param); typedef int32_t (*__optr_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); typedef struct STaskIdInfo { diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 94e4384b3025f0d2ecbbaafd9f92ad10aa84b926..b31fa279e57ad3436d19264071959e91fe4709f7 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -24,10 +24,9 @@ #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "executorInt.h" static SSDataBlock* doScanLastrow(SOperatorInfo* pOperator); -static void destroyLastrowScanOperator(void* param, int32_t numOfOutput); +static void destroyLastrowScanOperator(void* param); static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) { @@ -211,7 +210,7 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) { } } -void destroyLastrowScanOperator(void* param, int32_t numOfOutput) { +void destroyLastrowScanOperator(void* param) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; blockDataDestroy(pInfo->pRes); taosMemoryFreeClear(param); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 63d8563a09a26f4667ca203ab8d7973f3326a6ed..893acf1bbc67764d219bce708be83d48e5a6a49f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -76,12 +76,6 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define realloc u_realloc #endif -#define T_LONG_JMP(_obj, _c) \ - do { \ - assert((_c) != -1); \ - longjmp((_obj), (_c)); \ - } while (0); - #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) @@ -92,19 +86,17 @@ static int32_t getExprFunctionId(SExprInfo* pExprInfo) { return 0; } -static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes); - -static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pSDataBlock); +static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); static void releaseQueryBuf(size_t numOfTables); -static void destroyFillOperatorInfo(void* param, int32_t numOfOutput); -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput); -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); -static void destroyAggOperatorInfo(void* param, int32_t numOfOutput); +static void destroyFillOperatorInfo(void* param); +static void destroyProjectOperatorInfo(void* param); +static void destroyOrderOperatorInfo(void* param); +static void destroyAggOperatorInfo(void* param); -static void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput); -static void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput); +static void destroyIntervalOperatorInfo(void* param); +static void destroyExchangeOperatorInfo(void* param); static void destroyOperatorInfo(SOperatorInfo* pOperator); @@ -264,9 +256,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // 1. close current opened time window if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId))) { -#ifdef BUF_PAGE_DEBUG - qDebug("page_1"); -#endif SResultRowPosition pos = pResultRowInfo->cur; SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); releaseBufPage(pResultBuf, pPage); @@ -294,7 +283,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR // too many time window in query if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } return pResult; @@ -420,7 +409,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfo if (code != TSDB_CODE_SUCCESS) { qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); taskInfo->code = code; - longjmp(taskInfo->env, code); + T_LONG_JMP(taskInfo->env, code); } } @@ -1138,7 +1127,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } } else if (pQueryAttr->stableQuery && (!pQueryAttr->tsCompQuery) && (!pQueryAttr->diffQuery)) { // stable aggregate, not interval aggregate or normal column aggregate @@ -1189,7 +1178,7 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowEntryInfoOffset) != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } } } @@ -1481,7 +1470,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi if (TAOS_FAILED(code)) { releaseBufPage(pBuf, page); qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -1493,7 +1482,7 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor @@ -1567,7 +1556,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor @@ -1722,7 +1711,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER; // while (tsdbNextDataBlock(pTsdbReadHandle)) { // if (isTaskKilled(pRuntimeEnv->qinfo)) { -// longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); +// T_LONG_JMP(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED); // } // // tsdbRetrieveDataBlockInfo(pTsdbReadHandle, &blockInfo); @@ -1741,7 +1730,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // } // // if (terrno != TSDB_CODE_SUCCESS) { -// longjmp(pRuntimeEnv->env, terrno); +// T_LONG_JMP(pRuntimeEnv->env, terrno); // } // } @@ -1905,7 +1894,7 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) { // // // check for error // if (terrno != TSDB_CODE_SUCCESS) { -// longjmp(pRuntimeEnv->env, terrno); +// T_LONG_JMP(pRuntimeEnv->env, terrno); // } // // return true; @@ -2757,7 +2746,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { int32_t code = tsortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } pOperator->status = OP_RES_TO_RETURN; @@ -2952,7 +2941,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -2960,7 +2949,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SExprSupp* pSup1 = &pAggInfo->scalarExprSup; code = projectApplyFunctions(pSup1->pExprInfo, pBlock, pBlock, pSup1->pCtx, pSup1->numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -2969,7 +2958,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, true); code = doAggregateImpl(pOperator, pSup->pCtx); if (code != 0) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -3426,7 +3415,7 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) { } if (pOperator->fpSet.closeFn != NULL) { - pOperator->fpSet.closeFn(pOperator->info, pOperator->exprSupp.numOfExprs); + pOperator->fpSet.closeFn(pOperator->info); } if (pOperator->pDownstream != NULL) { @@ -3618,7 +3607,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: - destroyAggOperatorInfo(pInfo, numOfCols); + destroyAggOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -3643,7 +3632,7 @@ static void freeItem(void* pItem) { } } -void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { +void destroyAggOperatorInfo(void* param) { SAggOperatorInfo* pInfo = (SAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); @@ -3653,7 +3642,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyFillOperatorInfo(void* param, int32_t numOfOutput) { +void destroyFillOperatorInfo(void* param) { SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param; pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pRes = blockDataDestroy(pInfo->pRes); @@ -3669,7 +3658,7 @@ void destroyFillOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyExchangeOperatorInfo(void* param, int32_t numOfOutput) { +void destroyExchangeOperatorInfo(void* param) { SExchangeInfo* pExInfo = (SExchangeInfo*)param; taosRemoveRef(exchangeObjRefPool, pExInfo->self); } @@ -4657,27 +4646,6 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { taosMemoryFreeClear(pTaskInfo); } -static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type, int16_t bytes) { - if (val == NULL) { - setNull(output, type, bytes); - return; - } - - if (IS_VAR_DATA_TYPE(type)) { - // Binary data overflows for sort of unknown reasons. Let trim the overflow data - if (varDataTLen(val) > bytes) { - int32_t maxLen = bytes - VARSTR_HEADER_SIZE; - int32_t len = (varDataLen(val) > maxLen) ? maxLen : varDataLen(val); - memcpy(varDataVal(output), varDataVal(val), len); - varDataSetLen(output, len); - } else { - varDataCopy(output, val); - } - } else { - memcpy(output, val, bytes); - } -} - static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 05dffc658b29bb5eb6675edae62d04bb6442cc48..53709c7dcc78b380b54afd2a25947a67f5381ecd 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -36,8 +36,12 @@ static void freeGroupKey(void* param) { taosMemoryFree(pKey->pData); } -static void destroyGroupOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyGroupOperatorInfo(void* param) { SGroupbyOperatorInfo* pInfo = (SGroupbyOperatorInfo*)param; + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->keyBuf); taosArrayDestroy(pInfo->pGroupCols); @@ -247,7 +251,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { if (!pInfo->isInit) { recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); if (terrno != TSDB_CODE_SUCCESS) { // group by json error - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } pInfo->isInit = true; num++; @@ -265,7 +269,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { num++; recordNewGroupKeys(pInfo->pGroupCols, pInfo->pGroupColVals, pBlock, j); if (terrno != TSDB_CODE_SUCCESS) { // group by json error - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } continue; } @@ -273,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = j - num; @@ -291,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { setGroupResultOutputBuf(pOperator, &(pInfo->binfo), pOperator->exprSupp.numOfExprs, pInfo->keyBuf, len, pBlock->info.groupId, pInfo->aggSup.pResultBuf, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } int32_t rowIndex = pBlock->info.rows - num; @@ -350,7 +354,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { int32_t code = getTableScanInfo(pOperator, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // the pDataBlock are always the same one, no need to call this again @@ -360,7 +364,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pInfo->scalarSup.pExprInfo != NULL) { pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } } @@ -413,7 +417,11 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx } initResultSizeInfo(&pOperator->resultInfo, 4096); - initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, pInfo->groupKeyLen, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResultBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -426,11 +434,15 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL, destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFreeClear(pInfo); + destroyGroupOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); return NULL; } @@ -678,14 +690,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { if (pInfo->scalarSup.pExprInfo != NULL) { pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pExprInfo, pBlock, pBlock, pInfo->scalarSup.pCtx, pInfo->scalarSup.numOfExprs, NULL); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, pTaskInfo->code); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } } terrno = TSDB_CODE_SUCCESS; doHashPartition(pOperator, pBlock); if (terrno != TSDB_CODE_SUCCESS) { // group by json error - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } } @@ -710,7 +722,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { return buildPartitionResult(pOperator); } -static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyPartitionOperatorInfo(void* param) { SPartitionOperatorInfo* pInfo = (SPartitionOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosArrayDestroy(pInfo->pGroupCols); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 7d2b84d0f053a7c8c6e3f63db719f67b3d9e99f3..1bc7d458e0ee16decabea988a16713996d2468ce 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -25,7 +25,7 @@ static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); -static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); +static void destroyMergeJoinOperator(void* param); static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode); @@ -128,12 +128,11 @@ void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode) { pColumn->scale = pColumnNode->node.resType.scale; } -void destroyMergeJoinOperator(void* param, int32_t numOfOutput) { +void destroyMergeJoinOperator(void* param) { SJoinOperatorInfo* pJoinOperator = (SJoinOperatorInfo*)param; nodesDestroyNode(pJoinOperator->pCondAfterMerge); pJoinOperator->pRes = blockDataDestroy(pJoinOperator->pRes); - taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 94da3e23e1423d0954721059f5e1d62abdd8e872..0661ccd3902bc0ba653e988cf6a03f91d2c6c68f 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -23,7 +23,7 @@ static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOf static void setFunctionResultOutput(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t stage, int32_t numOfExprs); -static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyProjectOperatorInfo(void* param) { if (NULL == param) { return; } @@ -37,10 +37,13 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -static void destroyIndefinitOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyIndefinitOperatorInfo(void* param) { SIndefOperatorInfo* pInfo = (SIndefOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); taosArrayDestroy(pInfo->pPseudoColInfo); cleanupAggSup(&pInfo->aggSup); cleanupExprSupp(&pInfo->scalarSup); @@ -112,7 +115,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys return pOperator; _error: - destroyProjectOperatorInfo(pInfo, numOfCols); + destroyProjectOperatorInfo(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = code; return NULL; @@ -268,7 +271,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(downstream, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); @@ -277,7 +280,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, pProjectInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } status = doIngroupLimitOffset(pLimitInfo, pBlock->info.groupId, pInfo->pRes, pOperator); @@ -371,9 +374,12 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy initResultSizeInfo(&pOperator->resultInfo, numOfRows); - initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&pInfo->binfo, pResBlock); + int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr); pInfo->binfo.pRes = pResBlock; @@ -389,7 +395,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, destroyIndefinitOperatorInfo, NULL, NULL, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -397,7 +403,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy return pOperator; _error: - taosMemoryFree(pInfo); + destroyIndefinitOperatorInfo(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -415,7 +421,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp // the pDataBlock are always the same one, no need to call this again int32_t code = getTableScanInfo(downstream, &order, &scanFlag); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // there is an scalar expression that needs to be calculated before apply the group aggregation. @@ -424,7 +430,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs, pIndefInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -434,7 +440,7 @@ static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOp code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, pIndefInfo->pPseudoColInfo); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ea9ed20a96a160827c87a49b02fc49518236baaf..fc36d740a9b331dbcb338684afb9af24592597c3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -250,7 +250,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, int32_t code = tsdbRetrieveDatablockSMA(pTableScanInfo->dataReader, &pColAgg, &allColumnsHaveAgg); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } if (!allColumnsHaveAgg) { @@ -264,7 +264,7 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, if (pBlock->pBlockAgg == NULL) { pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); if (pBlock->pBlockAgg == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } } @@ -374,7 +374,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -495,7 +495,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -523,7 +523,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pOperator->pTaskInfo->env, code); } // current block is filter out according to filter condition, continue load the next block @@ -649,7 +649,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); return NULL; } } @@ -689,7 +689,7 @@ static int32_t getTableScannerExecInfo(struct SOperatorInfo* pOptr, void** pOptr return 0; } -static void destroyTableScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyTableScanOperatorInfo(void* param) { STableScanInfo* pTableScanInfo = (STableScanInfo*)param; blockDataDestroy(pTableScanInfo->pResBlock); cleanupQueryTableDataCond(&pTableScanInfo->cond); @@ -837,7 +837,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { int32_t code = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid, &blockDistInfo.rowSize, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); @@ -863,7 +863,7 @@ static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { return pBlock; } -static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyBlockDistScanOperatorInfo(void* param) { SBlockDistInfo* pDistInfo = (SBlockDistInfo*)param; blockDataDestroy(pDistInfo->pResBlock); tsdbReaderClose(pDistInfo->pHandle); @@ -1266,7 +1266,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { blockDataFreeRes((SSDataBlock*)pBlock); - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -1575,11 +1575,11 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNo return NULL; } -static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyStreamScanOperatorInfo(void* param) { SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; - destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput); + destroyTableScanOperatorInfo(pTableScanInfo); taosMemoryFreeClear(pStreamScan->pTableScanOp); } if (pStreamScan->tqReader) { @@ -1735,7 +1735,7 @@ _error: return NULL; } -static void destroySysScanOperator(void* param, int32_t numOfOutput) { +static void destroySysScanOperator(void* param) { SSysTableScanInfo* pInfo = (SSysTableScanInfo*)param; tsem_destroy(&pInfo->ready); blockDataDestroy(pInfo->pRes); @@ -1993,7 +1993,7 @@ static SSDataBlock* sysTableScanUserTags(SOperatorInfo* pOperator) { metaReaderClear(&smr); metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } char stableName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; @@ -2196,7 +2196,7 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) { metaReaderClear(&mr); metaCloseTbCursor(pInfo->pCur); pInfo->pCur = NULL; - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } // number of columns @@ -2574,7 +2574,7 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", item->uid, tstrerror(terrno), GET_TASKID(pTaskInfo)); metaReaderClear(&mr); - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { @@ -2624,12 +2624,10 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } -static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); - taosArrayDestroy(pInfo->pColMatchInfo); - taosMemoryFreeClear(param); } @@ -2825,7 +2823,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } @@ -2868,7 +2866,7 @@ static SSDataBlock* getTableDataBlock(void* param) { STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -2891,7 +2889,7 @@ static SSDataBlock* getTableDataBlock(void* param) { int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pOperator->pTaskInfo->env, code); } // current block is filter out according to filter condition, continue load the next block @@ -2984,7 +2982,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t code = tsortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } return TSDB_CODE_SUCCESS; @@ -3054,7 +3052,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList); if (!pInfo->hasGroupId) { @@ -3092,7 +3090,7 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { return pBlock; } -void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); taosArrayDestroy(pTableScanInfo->sortSourceParams); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 4dd5e4ec15e9521b6c2cdc39562313592242773c..e2014ec97320c863a6857e94c538bd8d8319c2a1 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -20,7 +20,7 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator); static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); -static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput); +static void destroyOrderOperatorInfo(void* param); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -156,7 +156,7 @@ void applyScalarFunction(SSDataBlock* pBlock, void* param) { int32_t code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pBlock, pBlock, pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs, NULL); if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pOperator->pTaskInfo->env, code); } } } @@ -184,7 +184,7 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { taosMemoryFreeClear(ps); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; @@ -204,7 +204,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } SSDataBlock* pBlock = NULL; @@ -250,7 +250,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; } -void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { +void destroyOrderOperatorInfo(void* param) { SSortOperatorInfo* pInfo = (SSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -388,7 +388,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { taosMemoryFreeClear(ps); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } return TSDB_CODE_SUCCESS; @@ -420,7 +420,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } if (!pInfo->hasGroupId) { @@ -468,7 +468,7 @@ int32_t getGroupSortExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, u return TSDB_CODE_SUCCESS; } -void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { +void destroyGroupSortOperatorInfo(void* param) { SGroupSortOperatorInfo* pInfo = (SGroupSortOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); @@ -575,7 +575,7 @@ int32_t doOpenMultiwayMergeOperator(SOperatorInfo* pOperator) { int32_t code = tsortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, terrno); + T_LONG_JMP(pTaskInfo->env, terrno); } pOperator->cost.openCost = (taosGetTimestampUs() - pInfo->startTs) / 1000.0; @@ -672,7 +672,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, @@ -685,7 +685,7 @@ SSDataBlock* doMultiwayMerge(SOperatorInfo* pOperator) { return pBlock; } -void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMultiwayMergeOperatorInfo(void* param) { SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index bacc5c28559ece721a777e2aa6162c8a94c7bf4e..d56ede49f7656e2bc83ffb044cf4084f36170889 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -624,7 +624,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } ASSERT(!isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); @@ -949,7 +949,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { @@ -972,7 +972,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // window start key interpolation @@ -1006,7 +1006,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { @@ -1182,7 +1182,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); @@ -1207,7 +1207,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); @@ -1693,7 +1693,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) { +static void destroyStateWindowOperatorInfo(void* param) { SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); taosMemoryFreeClear(pInfo->stateKey.pData); @@ -1706,7 +1706,7 @@ static void freeItem(void* param) { taosMemoryFree(pKey->pData); } -void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyIntervalOperatorInfo(void* param) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); @@ -1723,7 +1723,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { taosMemoryFreeClear(param); } -void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamFinalIntervalOperatorInfo(void* param) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); @@ -1740,7 +1740,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); + destroyStreamFinalIntervalOperatorInfo(pChildOp->info); taosMemoryFree(pChildOp->pDownstream); cleanupExprSupp(&pChildOp->exprSupp); taosMemoryFreeClear(pChildOp); @@ -1865,6 +1865,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); if (isStream) { @@ -1885,6 +1889,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } } + pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->delIndex = 0; @@ -1914,7 +1919,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* return pOperator; _error: - destroyIntervalOperatorInfo(pInfo, numOfCols); + destroyIntervalOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -1964,7 +1969,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &window, masterScan, &pResult, gid, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } // pInfo->numOfRows data belong to the current session window @@ -1983,7 +1988,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator int32_t ret = setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &pRowSup->win, masterScan, &pResult, gid, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code - longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); @@ -2371,7 +2376,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { int32_t code = initKeeperInfo(pSliceInfo, pBlock); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // the pDataBlock are always the same one, no need to call this again @@ -2599,7 +2604,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { return pResBlock->info.rows == 0 ? NULL : pResBlock; } -void destroyTimeSliceOperatorInfo(void* param, int32_t numOfOutput) { +void destroyTimeSliceOperatorInfo(void* param) { STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; pInfo->pRes = blockDataDestroy(pInfo->pRes); @@ -2707,7 +2712,11 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2728,18 +2737,27 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: - pTaskInfo->code = TSDB_CODE_SUCCESS; + destroyStateWindowOperatorInfo(pInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; return NULL; } -void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { +void destroySWindowOperatorInfo(void* param) { SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*)param; - cleanupBasicInfo(&pInfo->binfo); + if (pInfo == NULL) { + return; + } + cleanupBasicInfo(&pInfo->binfo); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupAggSup(&pInfo->aggSup); @@ -2793,15 +2811,15 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; - code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: - if (pInfo != NULL) { - destroySWindowOperatorInfo(pInfo, numOfCols); - } - + destroySWindowOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -2819,7 +2837,7 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 if (code != TSDB_CODE_SUCCESS) { qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); pTaskInfo->code = code; - longjmp(pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } } } @@ -2973,7 +2991,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, true, &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } if (IS_FINAL_OP(pInfo)) { @@ -3225,7 +3243,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { for (int32_t i = 0; i < chIndex + 1 - size; i++) { SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); if (!pChildOp) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info; pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; @@ -3364,15 +3382,17 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs); initBasicInfo(&pInfo->binfo, pResBlock); ASSERT(numOfCols > 0); increaseTs(pOperator->exprSupp.pCtx); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->pChildren = NULL; if (numOfChild > 0) { @@ -3438,7 +3458,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, return pOperator; _error: - destroyStreamFinalIntervalOperatorInfo(pInfo, numOfCols); + destroyStreamFinalIntervalOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -3476,7 +3496,7 @@ void destroyStateStreamAggSupporter(SStreamAggSupporter* pSup) { blockDataDestroy(pSup->pScanBlock); } -void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); @@ -3486,7 +3506,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + destroyStreamSessionAggOperatorInfo(pChInfo); taosMemoryFreeClear(pChild); } } @@ -3557,7 +3577,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pSessionNode->window.pExprs, NULL, &numOfScalar); - int32_t code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); + code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3621,7 +3641,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh _error: if (pInfo != NULL) { - destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); + destroyStreamSessionAggOperatorInfo(pInfo); } taosMemoryFreeClear(pOperator); @@ -3770,7 +3790,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes // too many time window in query int32_t size = taosArrayGetSize(pAggSup->pCurWins); if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } if (pWinInfo->pos.pageId == -1) { @@ -3922,7 +3942,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData pStDeleted); code = doOneWindowAgg(pInfo, pSDataBlock, pCurWin, &pResult, i, winRows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } int32_t winNum = getNumCompactWindow(pAggSup->pCurWins, winIndex, gap); @@ -3934,7 +3954,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData SWinKey value = {.ts = pCurWin->win.skey, .groupId = groupId}; code = taosHashPut(pStUpdated, &pCurWin->pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } pCurWin->isOutput = true; } @@ -4244,7 +4264,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SOperatorInfo* pChildOp = createStreamFinalSessionAggOperatorInfo(NULL, pInfo->pPhyNode, pOperator->pTaskInfo, 0); if (!pChildOp) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } taosArrayPush(pInfo->pChildren, &pChildOp); } @@ -4450,7 +4470,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream _error: if (pInfo != NULL) { - destroyStreamSessionAggOperatorInfo(pInfo, pOperator->exprSupp.numOfExprs); + destroyStreamSessionAggOperatorInfo(pInfo); } taosMemoryFreeClear(pOperator); @@ -4458,7 +4478,7 @@ _error: return NULL; } -void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { +void destroyStreamStateOperatorInfo(void* param) { SStreamStateAggOperatorInfo* pInfo = (SStreamStateAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStateStreamAggSupporter(&pInfo->streamAggSup); @@ -4468,7 +4488,7 @@ void destroyStreamStateOperatorInfo(void* param, int32_t numOfOutput) { for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, i); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; - destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput); + destroyStreamSessionAggOperatorInfo(pChInfo); taosMemoryFreeClear(pChild); taosMemoryFreeClear(pChInfo); } @@ -4705,14 +4725,14 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } code = doOneStateWindowAgg(pInfo, pSDataBlock, &pCurWin->winInfo, &pResult, i, winRows, numOfOutput, pOperator); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } pCurWin->winInfo.isClosed = false; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { SWinKey value = {.ts = pCurWin->winInfo.win.skey, .groupId = groupId}; code = taosHashPut(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition), &value, sizeof(SWinKey)); if (code != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } pCurWin->winInfo.isOutput = true; } @@ -4883,16 +4903,15 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys return pOperator; _error: - destroyStreamStateOperatorInfo(pInfo, numOfCols); + destroyStreamStateOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; } -void destroyMergeAlignedIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMergeAlignedIntervalOperatorInfo(void* param) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = (SMergeAlignedIntervalAggOperatorInfo*)param; - destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo, numOfOutput); - + destroyIntervalOperatorInfo(miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); } @@ -4955,7 +4974,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } int32_t currPos = startPos; @@ -4982,7 +5001,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } miaInfo->curTs = currWin.skey; @@ -5120,8 +5139,11 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&iaInfo->binfo, pResBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); @@ -5129,10 +5151,6 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); } - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - initResultRowInfo(&iaInfo->binfo.resultRowInfo); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -5156,7 +5174,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, return pOperator; _error: - destroyMergeAlignedIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeAlignedIntervalOperatorInfo(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; @@ -5179,10 +5197,10 @@ typedef struct SGroupTimeWindow { STimeWindow window; } SGroupTimeWindow; -void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { +void destroyMergeIntervalOperatorInfo(void* param) { SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; tdListFree(miaInfo->groupIntervals); - destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); + destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo); taosMemoryFreeClear(param); } @@ -5257,7 +5275,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } TSKEY ekey = ascScan ? win.ekey : win.skey; @@ -5274,7 +5292,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // window start key interpolation @@ -5303,7 +5321,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, numOfOutput, pExprSup->rowEntryInfoOffset, &iaInfo->aggSup, pTaskInfo); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { - longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + T_LONG_JMP(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } ekey = ascScan ? nextWin.ekey : nextWin.skey; @@ -5426,8 +5444,11 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI initResultSizeInfo(&pOperator->resultInfo, 4096); int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); - initBasicInfo(&iaInfo->binfo, pResBlock); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); @@ -5460,7 +5481,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + destroyMergeIntervalOperatorInfo(miaInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index ad97d79f7e7ad28da3cf51aab33010303e11f509..e0752840db07052e056063b5789003cf9b6507e0 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -26,7 +26,7 @@ typedef struct SLHashBucket { int32_t size; // the number of element in this entry } SLHashBucket; -typedef struct SLHashObj { +struct SLHashObj { SDiskbasedBuf *pBuf; _hash_fn_t hashFn; SLHashBucket **pBucket; // entry list @@ -35,7 +35,7 @@ typedef struct SLHashObj { int32_t bits; // the number of bits used in hash int32_t numOfBuckets; // the number of buckets int64_t size; // the number of total items -} SLHashObj; +}; /** * the data struct for each hash node @@ -99,7 +99,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t int32_t newPageId = -1; SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); if (pNewPage == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } taosArrayPush(pBucket->pPageIdList, &newPageId); @@ -138,7 +138,6 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket } setBufPageDirty(pPage, true); - pBucket->size -= 1; } @@ -229,6 +228,10 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { int32_t pageId = -1; SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); + if (p == NULL) { + return terrno; + } + p->num = sizeof(SFilePage); setBufPageDirty(p, true); @@ -252,7 +255,8 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ printf("tHash Init failed since %s", terrstr(terrno)); return NULL; } - int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, tsTempDir); + + int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, "", tsTempDir); if (code != 0) { terrno = code; return NULL; @@ -389,7 +393,9 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) { } SLHashBucket* pBucket = pHashObj->pBucket[bucketId]; - for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { + int32_t num = taosArrayGetSize(pBucket->pPageIdList); + + for (int32_t i = 0; i < num; ++i) { int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); SFilePage* p = getBufPage(pHashObj->pBuf, pageId); diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index bba4b254c5d56f2c72988897273d363a3fec3c0c..1c4216334945c0b682e313a975e558390fbd7049 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -26,7 +26,6 @@ #include "executor.h" #include "executorimpl.h" #include "function.h" -#include "stub.h" #include "taos.h" #include "tdatablock.h" #include "tdef.h" diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp index 695552faa0f353cc631b87cf03f51003c7b66aed..c9b75395bce345802ff0e563762758601aca0a18 100644 --- a/source/libs/executor/test/lhashTests.cpp +++ b/source/libs/executor/test/lhashTests.cpp @@ -26,40 +26,47 @@ TEST(testCase, linear_hash_Tests) { taosSeedRand(taosGetTimestampSec()); + strcpy(tsTempDir, "/tmp/"); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); -#if 0 - SLHashObj* pHashObj = tHashInit(256, 4096, fn, 320); - for(int32_t i = 0; i < 5000000; ++i) { + + int64_t st = taosGetTimestampUs(); + + SLHashObj* pHashObj = tHashInit(4098*4*2, 512, fn, 40); + for(int32_t i = 0; i < 1000000; ++i) { int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); assert(code == 0); } // tHashPrint(pHashObj, LINEAR_HASH_STATIS); + int64_t et = taosGetTimestampUs(); -// for(int32_t i = 0; i < 10000; ++i) { -// char* v = tHashGet(pHashObj, &i, sizeof(i)); -// if (v != NULL) { -//// printf("find value: %d, key:%d\n", *(int32_t*) v, i); -// } else { + for(int32_t i = 0; i < 1000000; ++i) { + if (i == 950000) { + printf("kf\n"); + } + char* v = tHashGet(pHashObj, &i, sizeof(i)); + if (v != NULL) { +// printf("find value: %d, key:%d\n", *(int32_t*) v, i); + } else { // printf("failed to found key:%d in hash\n", i); -// } -// } + } + } - tHashPrint(pHashObj, LINEAR_HASH_STATIS); +// tHashPrint(pHashObj, LINEAR_HASH_STATIS); tHashCleanup(pHashObj); -#endif + int64_t et1 = taosGetTimestampUs(); -#if 0 - SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK); + SHashObj* pHashObj1 = taosHashInit(1000, fn, false, HASH_NO_LOCK); for(int32_t i = 0; i < 1000000; ++i) { - taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); + taosHashPut(pHashObj1, &i, sizeof(i), &i, sizeof(i)); } - for(int32_t i = 0; i < 10000; ++i) { - void* v = taosHashGet(pHashObj, &i, sizeof(i)); + for(int32_t i = 0; i < 1000000; ++i) { + void* v = taosHashGet(pHashObj1, &i, sizeof(i)); } - taosHashCleanup(pHashObj); -#endif + taosHashCleanup(pHashObj1); + int64_t et2 = taosGetTimestampUs(); + printf("linear hash time:%.2f ms, buildHash:%.2f ms, hash:%.2f\n", (et1-st)/1000.0, (et-st)/1000.0, (et2-et1)/1000.0); } \ No newline at end of file diff --git a/source/libs/executor/test/sortTests.cpp b/source/libs/executor/test/sortTests.cpp index 6e244152f20e0d4b914b21fcb871a5bbec871fce..4ac15670ac5dca547572df102f7267de08c0306d 100644 --- a/source/libs/executor/test/sortTests.cpp +++ b/source/libs/executor/test/sortTests.cpp @@ -27,7 +27,6 @@ #include "executorimpl.h" #include "executor.h" -#include "stub.h" #include "taos.h" #include "tdatablock.h" #include "tdef.h" @@ -196,7 +195,7 @@ int32_t docomp(const void* p1, const void* p2, void* param) { } } // namespace -#if 1 +#if 0 TEST(testCase, inMem_sort_Test) { SBlockOrderInfo oi = {0}; oi.order = TSDB_ORDER_ASC; @@ -382,7 +381,7 @@ TEST(testCase, ordered_merge_sort_Test) { } void* v = tsortGetValue(pTupleHandle, 0); - printf("%d: %d\n", row, *(int32_t*) v); +// printf("%d: %d\n", row, *(int32_t*) v); ASSERT_EQ(row++, *(int32_t*) v); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 0e608d0da22da836f0a357c7bd4f9b194c11fd13..ac2128dd70b70b88dd32617067ff0ce8e0d3172a 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -309,6 +309,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { static char* evacOneDataPage(SDiskbasedBuf* pBuf) { char* bufPage = NULL; SListNode* pn = getEldestUnrefedPage(pBuf); + terrno = 0; // all pages are referenced by user, try to allocate new space if (pn == NULL) { @@ -332,6 +333,7 @@ static char* evacOneDataPage(SDiskbasedBuf* pBuf) { bufPage = flushPageToDisk(pBuf, d); } + ASSERT((bufPage != NULL) || terrno != TSDB_CODE_SUCCESS); return bufPage; }