From 14844c7b4bdb721778e123bfe92a7b1f895f7bdd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 8 Jul 2023 21:25:11 +0800 Subject: [PATCH] refactor(tsdb): opt tsdb read perf --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbMergeTree.c | 17 +- source/dnode/vnode/src/tsdb/tsdbRead2.c | 1204 +++---------------- 3 files changed, 186 insertions(+), 1037 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 0c39caa29d..7bd9cc4457 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -798,7 +798,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, - int16_t* pCols, int32_t numOfCols); + int16_t* pCols, int32_t numOfCols, void* pReader); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 45a20315b1..6c94cb5cec 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -15,6 +15,7 @@ #include "tsdb.h" #include "tsdbFSet2.h" +#include "tsdbReadUtil.h" #include "tsdbSttFileRW.h" static void tLDataIterClose2(SLDataIter *pIter); @@ -392,9 +393,9 @@ static int32_t loadSttTombBlockData(SSttFileReader* pSttFileReader, uint64_t sui return TSDB_CODE_SUCCESS; } -int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, +int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pSttFileReader, int32_t iStt, int8_t backward, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange, SSttBlockLoadInfo *pBlockLoadInfo, - const char *idStr, bool strictTimeRange) { + const char *idStr, bool strictTimeRange, void* pReader1) { int32_t code = TSDB_CODE_SUCCESS; pIter->uid = uid; @@ -404,7 +405,7 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32 pIter->verRange.maxVer = pRange->maxVer; pIter->timeWindow.skey = pTimeWindow->skey; pIter->timeWindow.ekey = pTimeWindow->ekey; - pIter->pReader = pReader; + pIter->pReader = pSttFileReader; pIter->pBlockLoadInfo = pBlockLoadInfo; if (!pBlockLoadInfo->sttBlockLoaded) { @@ -413,16 +414,18 @@ int32_t tLDataIterOpen2(struct SLDataIter *pIter, SSttFileReader *pReader, int32 code = tsdbSttFileReadSttBlk(pIter->pReader, (const TSttBlkArray **)&pBlockLoadInfo->pBlockArray); if (code != TSDB_CODE_SUCCESS) { + tsdbError("load stt blk, code:%s, %s", tstrerror(code), idStr); return code; } code = loadSttBlockInfo(pIter, pBlockLoadInfo, suid); - - code = loadSttTombBlockData(pReader, suid, pBlockLoadInfo); if (code != TSDB_CODE_SUCCESS) { + tsdbError("load stt block info failed, code:%s, %s", tstrerror(code), idStr); return code; } + code = loadSttTombDataForAll(pReader1, pIter->pReader, pBlockLoadInfo); + double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("load the last file info completed, elapsed time:%.2fms, %s", el, idStr); } @@ -721,7 +724,7 @@ _end: int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, const char *idStr, bool strictTimeRange, SArray *pSttFileBlockIterArray, void *pCurrentFileSet, STSchema* pSchema, - int16_t* pCols, int32_t numOfCols) { + int16_t* pCols, int32_t numOfCols, void* pReader) { int32_t code = TSDB_CODE_SUCCESS; pMTree->backward = backward; @@ -785,7 +788,7 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, int8_t backward, STsdb *pTsdb, uint6 memset(pIter, 0, sizeof(SLDataIter)); code = tLDataIterOpen2(pIter, pSttFileReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange, - pLoadInfo, pMTree->idStr, strictTimeRange); + pLoadInfo, pMTree->idStr, strictTimeRange, pReader); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index fcb9d0b4f4..0961a7d126 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -20,208 +20,13 @@ #include "tsdbMerge.h" #include "tsdbUtil2.h" #include "tsimplehash.h" +#include "tsdbReadUtil.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define getCurrentKeyInLastBlock(_r) ((_r)->currentKey) -typedef enum { - READER_STATUS_SUSPEND = 0x1, - READER_STATUS_NORMAL = 0x2, -} EReaderStatus; - -typedef enum { - EXTERNAL_ROWS_PREV = 0x1, - EXTERNAL_ROWS_MAIN = 0x2, - EXTERNAL_ROWS_NEXT = 0x3, -} EContentData; - -typedef enum { - READ_MODE_COUNT_ONLY = 0x1, - READ_MODE_ALL, -} EReadMode; - -typedef struct { - STbDataIter* iter; - int32_t index; - bool hasVal; -} SIterInfo; - -typedef struct { - int32_t numOfBlocks; - int32_t numOfLastFiles; -} SBlockNumber; - -typedef struct SBlockIndex { - int32_t ordinalIndex; - int64_t inFileOffset; - STimeWindow window; // todo replace it with overlap flag. -} SBlockIndex; - -typedef struct STableBlockScanInfo { - uint64_t uid; - TSKEY lastKey; - TSKEY lastKeyInStt; // last accessed key in stt - SArray* pBlockList; // block data index list, SArray - SArray* pMemDelData; // SArray - SArray* pfileDelData; // SArray from each file set - SIterInfo iter; // mem buffer skip list iterator - SIterInfo iiter; // imem buffer skip list iterator - SArray* delSkyline; // delete info for this table - int32_t fileDelIndex; // file block delete index - int32_t lastBlockDelIndex; // delete index for last block - bool iterInit; // whether to initialize the in-memory skip list iterator or not -} STableBlockScanInfo; - -typedef struct SBlockOrderWrapper { - int64_t uid; - int64_t offset; - STableBlockScanInfo *pInfo; -} SBlockOrderWrapper; - -typedef struct SBlockOrderSupporter { - SBlockOrderWrapper** pDataBlockInfo; - int32_t* indexPerTable; - int32_t* numOfBlocksPerTable; - int32_t numOfTables; -} SBlockOrderSupporter; - -typedef struct SIOCostSummary { - int64_t numOfBlocks; - double blockLoadTime; - double buildmemBlock; - int64_t headFileLoad; - double headFileLoadTime; - int64_t smaDataLoad; - double smaLoadTime; - int64_t lastBlockLoad; - double lastBlockLoadTime; - int64_t composedBlocks; - double buildComposedBlockTime; - double createScanInfoList; - double createSkylineIterTime; - double initLastBlockReader; -} SCostSummary; - -typedef struct SBlockLoadSuppInfo { - TColumnDataAggArray colAggArray; - SColumnDataAgg tsColAgg; - int16_t* colId; - int16_t* slotId; - int32_t numOfCols; - char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. - bool smaValid; // the sma on all queried columns are activated -} SBlockLoadSuppInfo; - -typedef struct SLastBlockReader { - STimeWindow window; - SVersionRange verRange; - int32_t order; - uint64_t uid; - SMergeTree mergeTree; - SSttBlockLoadInfo* pInfo; - int64_t currentKey; -} SLastBlockReader; - -typedef struct SFilesetIter { - int32_t numOfFiles; // number of total files - int32_t index; // current accessed index in the list - TFileSetArray* pFilesetList;// data file set list - int32_t order; - SLastBlockReader* pLastBlockReader; // last file block reader -} SFilesetIter; - -typedef struct SFileDataBlockInfo { - // index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it - uint64_t uid; - int32_t tbBlockIdx; - SBrinRecord record; -} SFileDataBlockInfo; - -typedef struct SDataBlockIter { - int32_t numOfBlocks; - int32_t index; - SArray* blockList; // SArray - int32_t order; - SDataBlk block; // current SDataBlk data - SSHashObj* pTableMap; -} SDataBlockIter; - -typedef struct SFileBlockDumpInfo { - int32_t totalRows; - int32_t rowIndex; - int64_t lastKey; - bool allDumped; -} SFileBlockDumpInfo; - -typedef struct STableUidList { - uint64_t* tableUidList; // access table uid list in uid ascending order list - int32_t currentIndex; // index in table uid list -} STableUidList; - -typedef struct SReaderStatus { - bool loadFromFile; // check file stage - bool composedDataBlock; // the returned data block is a composed block or not - SSHashObj* pTableMap; // SHash - STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks. - STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT. - SFileBlockDumpInfo fBlockDumpInfo; - STFileSet* pCurrentFileset;// current opened file set - SBlockData fileBlockData; - SFilesetIter fileIter; - SDataBlockIter blockIter; - SArray* pLDataIterArray; - SRowMerger merger; - SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data - TFileSetArray* pfSetArray; -} SReaderStatus; - -typedef struct SBlockInfoBuf { - int32_t currentIndex; - SArray* pData; - int32_t numPerBucket; - int32_t numOfTables; -} SBlockInfoBuf; - -typedef struct STsdbReaderInfo { - uint64_t suid; - STSchema* pSchema; - EReadMode readMode; - uint64_t rowsNum; - STimeWindow window; - SVersionRange verRange; - int16_t order; -} STsdbReaderInfo; - -typedef struct SResultBlockInfo { - SSDataBlock* pResBlock; - bool freeBlock; - int64_t capacity; -} SResultBlockInfo; - -struct STsdbReader { - STsdb* pTsdb; - STsdbReaderInfo info; - TdThreadMutex readerMutex; - EReaderStatus flag; - int32_t code; - uint64_t rowsNum; - SResultBlockInfo resBlockInfo; - SReaderStatus status; - char* idStr; // query info handle, for debug purpose - int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows - SBlockLoadSuppInfo suppInfo; - STsdbReadSnap* pReadSnap; - SCostSummary cost; - SHashObj** pIgnoreTables; - SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema - SDataFileReader* pFileReader; // the file reader - SBlockInfoBuf blockInfoBuf; - EContentData step; - STsdbReader* innerReader[2]; -}; - static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); -static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, +static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); @@ -243,9 +48,9 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); -static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, - int8_t* pLevel); +static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, + int8_t* pLevel); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader); static int32_t doBuildDataBlock(STsdbReader* pReader); @@ -254,8 +59,6 @@ static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFil static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); -static STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); - static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pCols, const int32_t* pSlotIdList, @@ -297,8 +100,7 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf i += 1; j += 1; - } else if (pTCol->colId < pSupInfo->colId[j]) { - // do nothing + } else if (pTCol->colId < pSupInfo->colId[j]) { // do nothing i += 1; } else { return TSDB_CODE_INVALID_PARA; @@ -308,207 +110,6 @@ static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInf return TSDB_CODE_SUCCESS; } -static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { - int32_t num = numOfTables / pBuf->numPerBucket; - int32_t remainder = numOfTables % pBuf->numPerBucket; - if (pBuf->pData == NULL) { - pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); - } - - for (int32_t i = 0; i < num; ++i) { - char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(pBuf->pData, &p); - } - - if (remainder > 0) { - char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - taosArrayPush(pBuf->pData, &p); - } - - pBuf->numOfTables = numOfTables; - - return TSDB_CODE_SUCCESS; -} - -static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { - if (numOfTables <= pBuf->numOfTables) { - return TSDB_CODE_SUCCESS; - } - - if (pBuf->numOfTables > 0) { - STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData); - taosMemoryFree(*p); - pBuf->numOfTables /= pBuf->numPerBucket; - } - - int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket; - int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket; - if (pBuf->pData == NULL) { - pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES); - } - - for (int32_t i = 0; i < num; ++i) { - char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - taosArrayPush(pBuf->pData, &p); - } - - if (remainder > 0) { - char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo)); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - taosArrayPush(pBuf->pData, &p); - } - - pBuf->numOfTables = numOfTables; - - return TSDB_CODE_SUCCESS; -} - -static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) { - size_t num = taosArrayGetSize(pBuf->pData); - for (int32_t i = 0; i < num; ++i) { - char** p = taosArrayGet(pBuf->pData, i); - taosMemoryFree(*p); - } - - taosArrayDestroy(pBuf->pData); -} - -static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { - int32_t bucketIndex = index / pBuf->numPerBucket; - char** pBucket = taosArrayGet(pBuf->pData, bucketIndex); - return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo); -} - -static int32_t uidComparFunc(const void* p1, const void* p2) { - uint64_t pu1 = *(uint64_t*)p1; - uint64_t pu2 = *(uint64_t*)p2; - if (pu1 == pu2) { - return 0; - } else { - return (pu1 < pu2) ? -1 : 1; - } -} - -// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model -static SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, - STableUidList* pUidList, int32_t numOfTables) { - // allocate buffer in order to load data blocks from file - // todo use simple hash instead, optimize the memory consumption - SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - if (pTableMap == NULL) { - return NULL; - } - - int64_t st = taosGetTimestampUs(); - initBlockScanInfoBuf(pBuf, numOfTables); - - pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t)); - if (pUidList->tableUidList == NULL) { - tSimpleHashCleanup(pTableMap); - return NULL; - } - - pUidList->currentIndex = 0; - - for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); - - pScanInfo->uid = idList[j].uid; - pUidList->tableUidList[j] = idList[j].uid; - - if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) { - int64_t skey = pTsdbReader->info.window.skey; - pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; - pScanInfo->lastKeyInStt = skey; - } else { - int64_t ekey = pTsdbReader->info.window.ekey; - pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; - pScanInfo->lastKeyInStt = ekey; - } - - tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); - tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, - pScanInfo->lastKey, pTsdbReader->idStr); - } - - taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); - - pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0; - tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables, - (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList, - pTsdbReader->idStr); - - return pTableMap; -} - -static void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { - void *p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) { - STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; - - pInfo->iterInit = false; - pInfo->iter.hasVal = false; - pInfo->iiter.hasVal = false; - - if (pInfo->iter.iter != NULL) { - pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter); - } - - if (pInfo->iiter.iter != NULL) { - pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter); - } - - pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline); - pInfo->lastKey = ts; - pInfo->lastKeyInStt = ts + step; - } -} - -static void clearBlockScanInfo(STableBlockScanInfo* p) { - p->iterInit = false; - p->iter.hasVal = false; - p->iiter.hasVal = false; - - if (p->iter.iter != NULL) { - p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter); - } - - if (p->iiter.iter != NULL) { - p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter); - } - - p->delSkyline = taosArrayDestroy(p->delSkyline); - p->pBlockList = taosArrayDestroy(p->pBlockList); - p->pMemDelData = taosArrayDestroy(p->pMemDelData); - p->pfileDelData = taosArrayDestroy(p->pfileDelData); -} - -static void destroyAllBlockScanInfo(SSHashObj* pTableMap) { - void* p = NULL; - int32_t iter = 0; - - while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) { - clearBlockScanInfo(*(STableBlockScanInfo**)p); - } - - tSimpleHashCleanup(pTableMap); -} - static bool isEmptyQueryTimeWindow(STimeWindow* pWindow) { return pWindow->skey > pWindow->ekey; } // Update the query time window according to the data time to live(TTL) information, in order to avoid to return @@ -572,7 +173,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo pIter->pLastBlockReader->uid = 0; tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); - pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime); + pReader->status.pLDataIterArray = + destroySttBlockReader(pReader->status.pLDataIterArray, &pSum->lastBlockLoad, &pSum->lastBlockLoadTime); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); // check file the time range of coverage @@ -645,7 +247,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo return TSDB_CODE_SUCCESS; } - _err: +_err: *hasNext = false; return code; } @@ -752,7 +354,8 @@ void tsdbReleaseDataBlock2(STsdbReader* pReader) { } } -static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, SQueryTableDataCond* pCond) { +static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, + SQueryTableDataCond* pCond) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; terrno = 0; @@ -835,7 +438,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void *ppReader = pReader; return code; - _end: +_end: tsdbReaderClose(pReader); *ppReader = NULL; return code; @@ -898,15 +501,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead break; } -// if (taosArrayGetSize(pIndexList) == 0) { - taosArrayPush(pIndexList, pBrinBlk); -// } else { -// if (newBlk) { -// taosArrayPush(pIndexList, pBrinBlk); -// } -// newBlk = false; -// } - + taosArrayPush(pIndexList, pBrinBlk); i += 1; } @@ -922,76 +517,13 @@ _end: return code; } -static void doCleanupTableScanInfo(STableBlockScanInfo* pScanInfo) { - // reset the index in last block when handing a new file - taosArrayClear(pScanInfo->pBlockList); - taosArrayClear(pScanInfo->pfileDelData); // del data from each file set -} - -static void cleanupTableScanInfo(SReaderStatus* pStatus) { - SSHashObj* pTableMap = pStatus->pTableMap; - STableBlockScanInfo** px = NULL; - int32_t iter = 0; - - while (1) { - px = tSimpleHashIterate(pTableMap, px, &iter); - if (px == NULL) { - break; - } - - doCleanupTableScanInfo(*px); - } -} - -typedef struct SBrinRecordIter { - SArray* pBrinBlockList; - SBrinBlk* pCurrentBlk; - int32_t blockIndex; - int32_t recordIndex; - SDataFileReader* pReader; - SBrinBlock block; - SBrinRecord record; -} SBrinRecordIter; - -void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) { - memset(&pIter->block, 0, sizeof(SBrinBlock)); - memset(&pIter->record, 0, sizeof(SBrinRecord)); - pIter->blockIndex = -1; - pIter->recordIndex = -1; - - pIter->pReader = pReader; - pIter->pBrinBlockList = pList; -} - -SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { - if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) { - pIter->blockIndex += 1; - if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) { - return NULL; - } - - pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex); - - tBrinBlockClear(&pIter->block); - tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); - pIter->recordIndex = -1; - } - - pIter->recordIndex += 1; - tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record); - return &pIter->record; -} - -void clearBrinBlockIter(SBrinRecordIter* pIter) { - tBrinBlockDestroy(&pIter->block); -} - -static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, SArray* pTableScanInfoList) { +static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockNumber* pBlockNum, + SArray* pTableScanInfoList) { size_t sizeInDisk = 0; int64_t st = taosGetTimestampUs(); // clear info for the new file - cleanupTableScanInfo(&pReader->status); + cleanupInfoFoxNextFileset(pReader->status.pTableMap); int32_t k = 0; int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); @@ -1196,18 +728,18 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData endPos = doBinarySearchKey(pBlockData->aTSKEY, pRecord->numRow, pos, key, pReader->info.order); } - if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)|| + if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) || (pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) { int32_t i = endPos; if (asc) { - for(; i >= 0; --i) { + for (; i >= 0; --i) { if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) { break; } } } else { - for(; i < pRecord->numRow; ++i) { + for (; i < pRecord->numRow; ++i) { if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) { break; } @@ -1361,7 +893,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) { if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) { // pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && pReader->info.verRange.maxVer >= pRecord->maxVer) { + } else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && + pReader->info.verRange.maxVer >= pRecord->maxVer) { // pDumpInfo->rowIndex = pRecord->numRow - 1; } else { // find the appropriate the start position in current block, and set it to be the current rowIndex int32_t pos = asc ? pRecord->numRow - 1 : 0; @@ -1381,17 +914,17 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { ASSERT(pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.maxVer >= pRecord->minVer); // find the appropriate start position that satisfies the version requirement. - if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer)|| + if ((pReader->info.verRange.maxVer >= pRecord->minVer && pReader->info.verRange.maxVer < pRecord->maxVer) || (pReader->info.verRange.minVer <= pRecord->maxVer && pReader->info.verRange.minVer > pRecord->minVer)) { int32_t i = pDumpInfo->rowIndex; if (asc) { - for(; i < pRecord->numRow; ++i) { + for (; i < pRecord->numRow; ++i) { if (pBlockData->aVersion[i] >= pReader->info.verRange.minVer) { break; } } } else { - for(; i >= 0; --i) { + for (; i >= 0; --i) { if (pBlockData->aVersion[i] <= pReader->info.verRange.maxVer) { break; } @@ -1478,7 +1011,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // check if current block are all handled if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - if (outOfTimeWindow(ts, &pReader->info.window)) { // the remain data has out of query time window, ignore current block + if (outOfTimeWindow(ts, + &pReader->info.window)) { // the remain data has out of query time window, ignore current block setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { @@ -1491,7 +1025,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { int32_t unDumpedRows = asc ? pRecord->numRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 - ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s", + ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s", pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows, unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); @@ -1562,200 +1096,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI return TSDB_CODE_SUCCESS; } -static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) { - taosMemoryFreeClear(pSup->numOfBlocksPerTable); - taosMemoryFreeClear(pSup->indexPerTable); - - for (int32_t i = 0; i < pSup->numOfTables; ++i) { - SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i]; - taosMemoryFreeClear(pBlockInfo); - } - - taosMemoryFreeClear(pSup->pDataBlockInfo); -} - -static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) { - pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); - pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); - pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables); - - if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) { - cleanupBlockOrderSupporter(pSup); - return TSDB_CODE_OUT_OF_MEMORY; - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) { - int32_t leftIndex = *(int32_t*)pLeft; - int32_t rightIndex = *(int32_t*)pRight; - - SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param; - - int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex]; - int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex]; - - if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) { - /* left block is empty */ - return 1; - } else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) { - /* right block is empty */ - return -1; - } - - SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex]; - SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex]; - - return pLeftBlock->offset > pRightBlock->offset ? 1 : -1; -} - -static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter, const char* idStr) { -#if 0 - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - if (pBlockInfo != NULL) { - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pBlockIter->pTableMap, pBlockInfo->uid, idStr); - if (pScanInfo == NULL) { - return terrno; - } - - SBlockIndex* pIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx); - tMapDataGetItemByIdx(&pScanInfo->mapData, pIndex->ordinalIndex, &pBlockIter->block, tGetDataBlk); - } -#endif - -#if 0 - qDebug("check file block, table uid:%"PRIu64" index:%d offset:%"PRId64", ", pScanInfo->uid, *mapDataIndex, pBlockIter->block.aSubBlock[0].offset); -#endif - - return TSDB_CODE_SUCCESS; -} - -static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) { - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - - SBlockOrderSupporter sup = {0}; - pBlockIter->numOfBlocks = numOfBlocks; - taosArrayClear(pBlockIter->blockList); - - pBlockIter->pTableMap = pReader->status.pTableMap; - - // access data blocks according to the offset of each block in asc/desc order. - int32_t numOfTables = taosArrayGetSize(pTableList); - - int64_t st = taosGetTimestampUs(); - int32_t code = initBlockOrderSupporter(&sup, numOfTables); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - int32_t cnt = 0; - - for (int32_t i = 0; i < numOfTables; ++i) { - STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i); -// ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0); - - size_t num = taosArrayGetSize(pTableScanInfo->pBlockList); - sup.numOfBlocksPerTable[sup.numOfTables] = num; - - char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num); - if (buf == NULL) { - cleanupBlockOrderSupporter(&sup); - return TSDB_CODE_OUT_OF_MEMORY; - } - - sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf; - - for (int32_t k = 0; k < num; ++k) { - SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k); - sup.pDataBlockInfo[sup.numOfTables][k] = - (SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo}; - cnt++; - } - - sup.numOfTables += 1; - } - - if (numOfBlocks != cnt && sup.numOfTables != numOfTables) { - cleanupBlockOrderSupporter(&sup); - return TSDB_CODE_INVALID_PARA; - } - - // since there is only one table qualified, blocks are not sorted - if (sup.numOfTables == 1) { - for (int32_t i = 0; i < numOfBlocks; ++i) { - SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; - blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i); - - taosArrayPush(pBlockIter->blockList, &blockInfo); - } - - int64_t et = taosGetTimestampUs(); - tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", - pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr); - - pBlockIter->index = asc ? 0 : (numOfBlocks - 1); - cleanupBlockOrderSupporter(&sup); - doSetCurrentBlock(pBlockIter, pReader->idStr); - return TSDB_CODE_SUCCESS; - } - - tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, - pReader->idStr); - - SMultiwayMergeTreeInfo* pTree = NULL; - - uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); - if (ret != TSDB_CODE_SUCCESS) { - cleanupBlockOrderSupporter(&sup); - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfTotal = 0; - while (numOfTotal < cnt) { - int32_t pos = tMergeTreeGetChosenIndex(pTree); - int32_t index = sup.indexPerTable[pos]++; - - SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index}; - blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index); - - taosArrayPush(pBlockIter->blockList, &blockInfo); - - // set data block index overflow, in order to disable the offset comparator - if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) { - sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1; - } - - numOfTotal += 1; - tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree)); - } - - int64_t et = taosGetTimestampUs(); - tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, - (et - st) / 1000.0, pReader->idStr); - cleanupBlockOrderSupporter(&sup); - taosMemoryFree(pTree); - - pBlockIter->index = asc ? 0 : (numOfBlocks - 1); - doSetCurrentBlock(pBlockIter, pReader->idStr); - - return TSDB_CODE_SUCCESS; -} - -static bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) { - bool asc = ASCENDING_TRAVERSE(pBlockIter->order); - - int32_t step = asc ? 1 : -1; - if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) { - return false; - } - - pBlockIter->index += step; - doSetCurrentBlock(pBlockIter, idStr); - - return true; -} - /** * This is an two rectangles overlap cases. */ @@ -1778,8 +1118,8 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl } int32_t step = asc ? 1 : -1; -// *nextIndex = pBlockInfo->tbBlockIdx + step; -// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); + // *nextIndex = pBlockInfo->tbBlockIdx + step; + // *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex); SBrinRecord* p = taosArrayGet(pTableBlockScanInfo->pBlockList, pBlockInfo->tbBlockIdx + step); memcpy(pRecord, p, sizeof(SBrinRecord)); @@ -1819,20 +1159,10 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t ASSERT(pBlockInfo->uid == fblock.uid && pBlockInfo->tbBlockIdx == fblock.tbBlockIdx); } - doSetCurrentBlock(pBlockIter, ""); return TSDB_CODE_SUCCESS; } // todo: this attribute could be acquired during extractin the global ordered block list. -static bool overlapWithNeighborBlock(SDataBlk* pBlock, SBlockIndex* pNeighborBlockIndex, int32_t order) { - // it is the last block in current file, no chance to overlap with neighbor blocks. - if (ASCENDING_TRAVERSE(order)) { - return pBlock->maxKey.ts == pNeighborBlockIndex->window.skey; - } else { - return pBlock->minKey.ts == pNeighborBlockIndex->window.ekey; - } -} - static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) { // it is the last block in current file, no chance to overlap with neighbor blocks. if (ASCENDING_TRAVERSE(order)) { @@ -1981,8 +1311,8 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pBlock return loadDataBlock; } -static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, - STableBlockScanInfo* pScanInfo, TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { +static bool isCleanFileDataBlock(STsdbReader* pReader, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, + TSDBKEY keyInBuf, SLastBlockReader* pLastBlockReader) { SDataBlockToLoadInfo info = {0}; getBlockToLoadInfo(&info, pBlockInfo, pScanInfo, keyInBuf, pLastBlockReader, pReader); bool isCleanFileBlock = !(info.overlapWithNeighborBlock || info.hasDupTs || info.overlapWithKeyInBuf || @@ -2007,7 +1337,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 - " - %" PRId64 ", uid:%" PRIu64 ", %s", + " - %" PRId64 ", uid:%" PRIu64 ", %s", pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid, pReader->idStr); @@ -2053,13 +1383,14 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc } TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); - int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; - int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; + int64_t key = pRow->pBlockData->aTSKEY[pRow->iRow]; + int64_t ver = pRow->pBlockData->aVersion[pRow->iRow]; pLastBlockReader->currentKey = key; pScanInfo->lastKeyInStt = key; - if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, pVerRange)) { + if (!hasBeenDropped(pScanInfo->delSkyline, &pScanInfo->lastBlockDelIndex, key, ver, pLastBlockReader->order, + pVerRange)) { return true; } } @@ -2116,7 +1447,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* } STSchema* ptr = NULL; - int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr); + int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->info.suid, uid, sversion, &ptr); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL; @@ -2209,7 +1540,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + pReader->idStr); } if (minKey == k.ts) { @@ -2239,7 +1571,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return terrno; } - int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); + int32_t code = tsdbRowMergerAdd(pMerger, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2261,7 +1593,8 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + pReader->idStr); } if (minKey == key) { @@ -2294,7 +1627,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, bool mergeBlockData) { - SRowMerger* pMerger = &pReader->status.merger; + SRowMerger* pMerger = &pReader->status.merger; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); @@ -2304,9 +1637,10 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, TSDBROW* pRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree); // create local variable to hold the row value - TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; + TSDBROW fRow = {.iRow = pRow->iRow, .type = TSDBROW_COL_FMT, .pBlockData = pRow->pBlockData}; - tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, pReader->idStr); + tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", pRow->pBlockData, pRow->iRow, pLastBlockReader->uid, + pReader->idStr); // only last block exists if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) { @@ -2326,7 +1660,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, TSDBROW* pRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, + pReader->idStr); code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -2348,7 +1683,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, return code; } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, + pReader->idStr); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { @@ -2376,7 +1712,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader* pLastBlockReader, int64_t key, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - SRowMerger* pMerger = &pReader->status.merger; + SRowMerger* pMerger = &pReader->status.merger; // merge is not initialized yet, due to the fact that the pReader->info.pSchema is not initialized if (pMerger->pArray == NULL) { @@ -2401,7 +1737,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); } else if (key == ts) { - SRow* pTSRow = NULL; + SRow* pTSRow = NULL; int32_t code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2539,7 +1875,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + pReader->idStr); } if (minKey == ik.ts) { @@ -2615,7 +1952,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } } - doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, + pReader->idStr); } if (minKey == key) { @@ -2675,34 +2013,6 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan return code; } -static void doLoadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { - if (pScanInfo->pMemDelData == NULL) { - pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData)); - } - - SDelData* p = NULL; - if (pMemTbData != NULL) { - p = pMemTbData->pHead; - while (p) { - if (p->version <= ver) { - taosArrayPush(pScanInfo->pMemDelData, p); - } - - p = p->pNext; - } - } - - if (piMemTbData != NULL) { - p = piMemTbData->pHead; - while (p) { - if (p->version <= ver) { - taosArrayPush(pScanInfo->pMemDelData, p); - } - p = p->pNext; - } - } -} - static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { if (pBlockScanInfo->iterInit) { return TSDB_CODE_SUCCESS; @@ -2716,20 +2026,20 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea startKey = (TSDBKEY){.ts = pBlockScanInfo->lastKey - 1, .version = pReader->info.verRange.maxVer}; } - int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, - &pBlockScanInfo->iter, "mem"); + int32_t code = + doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem"); if (code != TSDB_CODE_SUCCESS) { return code; } STbData* di = NULL; - code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, - &pBlockScanInfo->iiter, "imem"); + code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter, + "imem"); if (code != TSDB_CODE_SUCCESS) { return code; } - doLoadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer); + loadMemTombData(pBlockScanInfo, d, di, pReader->info.verRange.maxVer); pBlockScanInfo->iterInit = true; return TSDB_CODE_SUCCESS; @@ -2764,159 +2074,6 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum return true; } -static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) { - STombRecord record = {0}; - for (int32_t j = 0; j < pBlock->suid->size; ++j) { - int32_t code = tTombBlockGet(pBlock, j, &record); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (record.suid < suid) { - continue; - } - - if (record.suid > suid || (record.suid == suid && record.uid > uid)) { - break; - } - - if (record.uid < uid) { - continue; - } - - if (record.version <= maxVer) { - SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pData, &delData); - } - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t loadTombRecordsFromSttFiles(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pBlockScanInfo, - uint64_t maxVer) { - int32_t size = taosArrayGetSize(pLDataIterList); - if (size <= 0) { - return TSDB_CODE_SUCCESS; - } - - uint64_t uid = pBlockScanInfo->uid; - if (pBlockScanInfo->pfileDelData == NULL) { - pBlockScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); - } - - for(int32_t i = 0; i < size; ++i) { - SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i); - - int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter); - if (numOfIter == 0) { - continue; - } - - for (int32_t f = 0; f < numOfIter; ++f) { - SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f); - - SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray; - int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray); - for (int32_t k = 0; k < numOfBlocks; ++k) { - STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k); - - int32_t code = checkTombBlockRecords(pBlockScanInfo->pfileDelData, pBlock, suid, uid, maxVer); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } - } - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t loadTombRecordsFromDataFiles(STsdbReader* pReader, int32_t numOfTables) { - if (pReader->status.pCurrentFileset == NULL || pReader->status.pCurrentFileset->farr[3] == NULL) { - return TSDB_CODE_SUCCESS; - } - - const TTombBlkArray* pBlkArray = NULL; - - int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // todo find the correct start position. - int32_t i = 0, j = 0; - while (i < pBlkArray->size && j < numOfTables) { - STombBlock block = {0}; - code = tsdbDataFileReadTombBlock(pReader->pFileReader, &pBlkArray->data[i], &block); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - uint64_t uid = pReader->status.uidList.tableUidList[j]; - - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pfileDelData == NULL) { - pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); - } - - STombRecord record = {0}; - for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) { - code = tTombBlockGet(&block, k, &record); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (record.suid < pReader->info.suid) { - continue; - } - - if (record.suid > pReader->info.suid) { - tTombBlockDestroy(&block); - return TSDB_CODE_SUCCESS; - } - - bool newTable = false; - if (uid < record.uid) { - while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) { - j += 1; - newTable = true; - } - - if (j >= numOfTables) { - tTombBlockDestroy(&block); - break; - } - - uid = pReader->status.uidList.tableUidList[j]; - } - - if (record.uid < uid) { - continue; - } - - ASSERT(record.suid == pReader->info.suid && uid == record.uid); - - if (newTable) { - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); - if (pScanInfo->pfileDelData == NULL) { - pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData)); - } - } - - if (record.version <= pReader->info.verRange.maxVer) { - SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey}; - taosArrayPush(pScanInfo->pfileDelData, &delData); - } - } - - i += 1; - tTombBlockDestroy(&block); - } - - return TSDB_CODE_SUCCESS; -} - static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { // the last block reader has been initialized for this table. if (pLBlockReader->uid == pScanInfo->uid) { @@ -2939,16 +2096,11 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int64_t st = taosGetTimestampUs(); tsdbDebug("init last block reader, window:%" PRId64 "-%" PRId64 ", uid:%" PRIu64 ", %s", w.skey, w.ekey, pScanInfo->uid, pReader->idStr); - - int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, - pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, false, - pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, pReader->info.pSchema, - pReader->suppInfo.colId, pReader->suppInfo.numOfCols); - if (code != TSDB_CODE_SUCCESS) { - return false; - } - code = loadTombRecordsFromSttFiles(pReader->status.pLDataIterArray, pReader->info.suid, pScanInfo, pReader->info.verRange.maxVer); + int32_t code = tMergeTreeOpen2(&pLBlockReader->mergeTree, (pLBlockReader->order == TSDB_ORDER_DESC), pReader->pTsdb, + pReader->info.suid, pScanInfo->uid, &w, &pLBlockReader->verRange, pReader->idStr, + false, pReader->status.pLDataIterArray, pReader->status.pCurrentFileset, + pReader->info.pSchema, pReader->suppInfo.colId, pReader->suppInfo.numOfCols, pReader); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -2961,7 +2113,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan int64_t el = taosGetTimestampUs() - st; pReader->cost.initLastBlockReader += (el / 1000.0); - tsdbDebug("init last block reader completed, elapsed time:%"PRId64"us %s", el, pReader->idStr); + tsdbDebug("init last block reader completed, elapsed time:%" PRId64 "us %s", el, pReader->idStr); return code; } @@ -3000,7 +2152,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc } else { TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - SRow* pTSRow = NULL; + SRow* pTSRow = NULL; code = tsdbRowMergerAdd(pMerger, &fRow, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; @@ -3055,9 +2207,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, bool* loadNeighbor) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; - int32_t nextIndex = -1; + int32_t code = TSDB_CODE_SUCCESS; + int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1; + int32_t nextIndex = -1; *loadNeighbor = false; @@ -3113,10 +2265,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int64_t st = taosGetTimestampUs(); - int32_t step = asc ? 1 : -1; - double el = 0; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int64_t st = taosGetTimestampUs(); + int32_t step = asc ? 1 : -1; + double el = 0; SBrinRecord* pRecord = &pBlockInfo->record; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; @@ -3134,7 +2286,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } pRecord = &pBlockInfo->record; - TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); + TSDBKEY keyInBuf = getCurrentKeyInBuf(pBlockScanInfo, pReader); // it is a clean block, load it directly if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf, pLastBlockReader) && @@ -3153,8 +2305,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } else { // file blocks not exist ASSERT(0); pBlockScanInfo = *pReader->status.pTableIter; - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { -// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order); + if (pReader->pIgnoreTables && + taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { + // setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order); return code; } } @@ -3200,7 +2353,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { // currently loaded file data block is consumed if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) { -// pBlock = getCurrentBlock(&pReader->status.blockIter); + // pBlock = getCurrentBlock(&pReader->status.blockIter); setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order); break; } @@ -3210,13 +2363,13 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } } - _end: +_end: el = (taosGetTimestampUs() - st) / 1000.0; updateComposedBlockInfo(pReader, el, pBlockScanInfo); if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 - ", elapsed time:%.2f ms %s", + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3243,7 +2396,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde } int64_t st = taosGetTimestampUs(); - + if (pBlockScanInfo->delSkyline != NULL) { taosArrayClear(pBlockScanInfo->delSkyline); } else { @@ -3258,7 +2411,7 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde } code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline); - + taosArrayClear(pBlockScanInfo->pfileDelData); int32_t index = getInitialDelIndex(pBlockScanInfo->delSkyline, order); @@ -3269,12 +2422,12 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde double el = taosGetTimestampUs() - st; pCost->createSkylineIterTime = el / 1000.0; - + return code; } TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - bool asc = ASCENDING_TRAVERSE(pReader->info.order); + bool asc = ASCENDING_TRAVERSE(pReader->info.order); TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}, ikey = {.ts = TSKEY_INITIAL_VAL}; bool hasKey = false, hasIKey = false; @@ -3356,7 +2509,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr } taosArrayDestroy(pIndexList); - return loadTombRecordsFromDataFiles(pReader, numOfTables); + return loadDataFileTombDataForAll(pReader); } static void resetTableListIndex(SReaderStatus* pStatus) { @@ -3401,7 +2554,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pScanInfo->uid, sizeof(pScanInfo->uid))) { // reset the index in last block when handing a new file -// doCleanupTableScanInfo(pScanInfo); + // doCleanupTableScanInfo(pScanInfo); bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -3411,7 +2564,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } // reset the index in last block when handing a new file -// doCleanupTableScanInfo(pScanInfo); + // doCleanupTableScanInfo(pScanInfo); bool hasDataInLastFile = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasDataInLastFile) { @@ -3447,7 +2600,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 - ", elapsed time:%.2f ms %s", + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); return TSDB_CODE_SUCCESS; @@ -3462,7 +2615,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } static int32_t doBuildDataBlock(STsdbReader* pReader) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; @@ -3498,7 +2651,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { } else if (bufferDataInFileBlockGap(pReader->info.order, keyInBuf, pBlockInfo)) { // data in memory that are earlier than current file block // rows in buffer should be less than the file block in asc, greater than file block in desc - int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; + int64_t endKey = + (ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey; code = buildDataBlockFromBuf(pReader, pScanInfo, endKey); } else { if (hasDataInLastBlock(pLastBlockReader) && !ASCENDING_TRAVERSE(pReader->info.order)) { @@ -3537,7 +2691,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { if (pResBlock->info.rows > 0) { tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 " rows:%" PRId64 - ", elapsed time:%.2f ms %s", + ", elapsed time:%.2f ms %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, el, pReader->idStr); } @@ -3552,14 +2706,15 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // update the last key for the corresponding table pScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? pInfo->window.ekey : pInfo->window.skey; - tsdbDebug("%p uid:%" PRIu64 " clean file block retrieved from file, global index:%d, " + tsdbDebug("%p uid:%" PRIu64 + " clean file block retrieved from file, global index:%d, " "table index:%d, rows:%d, brange:%" PRId64 "-%" PRId64 ", %s", pReader, pScanInfo->uid, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlockInfo->record.numRow, pBlockInfo->record.firstKey, pBlockInfo->record.lastKey, pReader->idStr); } } - return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; + return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) { @@ -3592,14 +2747,14 @@ static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReade } STableBlockScanInfo* pScanInfo = *p; - SDataBlk block = {0}; -// for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { -// tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); -// pReader->rowsNum += block.nRow; -// } + SDataBlk block = {0}; + // for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { + // tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block); + // pReader->rowsNum += block.nRow; + // } } - _end: +_end: tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle); return code; } @@ -3669,7 +2824,7 @@ static int32_t readRowsCountFromFiles(STsdbReader* pReader) { break; } -// code = doSumFileBlockRows(pReader, pReader->pFileReader); + // code = doSumFileBlockRows(pReader, pReader->pFileReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3712,7 +2867,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { } STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; - if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { + if (pReader->pIgnoreTables && + taosHashGet(*pReader->pIgnoreTables, &(*pBlockScanInfo)->uid, sizeof((*pBlockScanInfo)->uid))) { bool hasNexTable = moveToNextTable(pUidList, pStatus); if (!hasNexTable) { return TSDB_CODE_SUCCESS; @@ -3767,7 +2923,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { SBlockNumber num = {0}; - SArray* pTableList = taosArrayInit(40, POINTER_BYTES); + SArray* pTableList = taosArrayInit(40, POINTER_BYTES); int32_t code = moveToNextFile(pReader, &num, pTableList); if (code != TSDB_CODE_SUCCESS) { @@ -3812,7 +2968,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SDataBlockIter* pBlockIter = &pReader->status.blockIter; - while(1) { + while (1) { terrno = 0; code = doLoadLastBlockSequentially(pReader); @@ -3835,7 +2991,7 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { return TSDB_READ_RETURN; } - if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. + if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. return TSDB_READ_CONTINUE; } else { // all blocks in data file are checked, let's check the data in last files resetTableListIndex(&pReader->status); @@ -3848,7 +3004,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool asc = ASCENDING_TRAVERSE(pReader->info.order); SDataBlockIter* pBlockIter = &pReader->status.blockIter; - SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; if (pBlockIter->numOfBlocks == 0) { // let's try to extract data from stt files. @@ -3910,8 +3066,8 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret int8_t precision = pVnode->config.tsdbCfg.precision; int64_t now = taosGetTimestamp(precision); int64_t offset = tsQueryRsmaTolerance * ((precision == TSDB_TIME_PRECISION_MILLI) ? 1L - : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L - : 1000000L); + : (precision == TSDB_TIME_PRECISION_MICRO) ? 1000L + : 1000000L); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; @@ -3961,7 +3117,8 @@ SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_ return (SVersionRange){.minVer = startVer, .maxVer = endVer}; } -bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, SVersionRange* pVerRange) { +bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t ver, int32_t order, + SVersionRange* pVerRange) { if (pDelList == NULL || (taosArrayGetSize(pDelList) == 0)) { return false; } @@ -3979,8 +3136,7 @@ bool hasBeenDropped(const SArray* pDelList, int32_t* index, int64_t key, int64_t return false; } else if (key == last->ts) { TSDBKEY* prev = taosArrayGet(pDelList, num - 2); - return (prev->version >= ver && prev->version <= pVerRange->maxVer && - prev->version >= pVerRange->minVer); + return (prev->version >= ver && prev->version <= pVerRange->maxVer && prev->version >= pVerRange->minVer); } } else { TSDBKEY* pCurrent = taosArrayGet(pDelList, *index); @@ -4189,9 +3345,9 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SRowMerger* pMerger = &pReader->status.merger; - bool asc = ASCENDING_TRAVERSE(pReader->info.order); - int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; - int32_t step = asc ? 1 : -1; + bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + int32_t step = asc ? 1 : -1; pDumpInfo->rowIndex += step; if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { @@ -4205,7 +3361,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc CHECK_FILEBLOCK_STATE st; SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); -// SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); + // SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter); if (pFileBlockInfo == NULL) { st = CHECK_FILEBLOCK_QUIT; break; @@ -4288,14 +3444,14 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, return terrno; } - tsdbRowMergerAdd(&pReader->status.merger,pNextRow, pTSchema1); + tsdbRowMergerAdd(&pReader->status.merger, pNextRow, pTSchema1); } else { // let's merge rows in file block code = tsdbRowMergerAdd(&pReader->status.merger, ¤t, pReader->info.pSchema); if (code != TSDB_CODE_SUCCESS) { return code; } - tsdbRowMergerAdd(&pReader->status.merger,pNextRow, NULL); + tsdbRowMergerAdd(&pReader->status.merger, pNextRow, NULL); } code = doMergeRowsInBuf(pIter, uid, TSDBROW_TS(¤t), pDelList, pReader); @@ -4342,9 +3498,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } - tsdbRowMergerAdd(&pReader->status.merger,pRow, pSchema); - code = - doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); + tsdbRowMergerAdd(&pReader->status.merger, pRow, pSchema); + code = doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -4372,8 +3527,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p return code; } -static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, int64_t endKey, - bool* freeTSRow) { +static int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, TSDBROW* pResRow, + int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); SArray* pDelList = pBlockScanInfo->delSkyline; @@ -4583,7 +3738,7 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t int32_t size = tSimpleHashGetSize(pReader->status.pTableMap); STableBlockScanInfo** p = NULL; - int32_t iter = 0; + int32_t iter = 0; while ((p = tSimpleHashIterate(pReader->status.pTableMap, p, &iter)) != NULL) { clearBlockScanInfo(*p); @@ -4670,15 +3825,38 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) { } static void freeSchemaFunc(void* param) { - void **p = (void **)param; + void** p = (void**)param; taosMemoryFreeClear(*p); } +static void clearSharedPtr(STsdbReader* p) { + p->status.pTableMap = NULL; + p->status.uidList.tableUidList = NULL; + p->status.pfSetArray = NULL; + p->info.pSchema = NULL; + p->pReadSnap = NULL; + p->pSchemaMap = NULL; +} + +static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { + pDst->status.pTableMap = pSrc->status.pTableMap; + pDst->status.uidList = pSrc->status.uidList; + pDst->status.pfSetArray = pSrc->status.pfSetArray; + pDst->info.pSchema = pSrc->info.pSchema; + pDst->pSchemaMap = pSrc->pSchemaMap; + pDst->pReadSnap = pSrc->pReadSnap; + + if (pDst->info.pSchema) { + tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); + } +} + // ====================================== EXPOSED APIs ====================================== int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, - SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, SHashObj** pIgnoreTables) { + SSDataBlock* pResBlock, void** ppReader, const char* idstr, bool countOnly, + SHashObj** pIgnoreTables) { STimeWindow window = pCond->twindows; - SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); + SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); int32_t capacity = pConf->tsdbCfg.maxRows; if (pResBlock != NULL) { @@ -4790,41 +3968,19 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi pReader->pIgnoreTables = pIgnoreTables; tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 - " in this query %s", + " in this query %s", pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->info.verRange.minVer, pReader->info.verRange.maxVer, pReader->idStr); return code; - _err: +_err: tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); tsdbReaderClose2(*ppReader); *ppReader = NULL; // reset the pointer value. return code; } -static void clearSharedPtr(STsdbReader* p) { - p->status.pTableMap = NULL; - p->status.uidList.tableUidList = NULL; - p->status.pfSetArray = NULL; - p->info.pSchema = NULL; - p->pReadSnap = NULL; - p->pSchemaMap = NULL; -} - -static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { - pDst->status.pTableMap = pSrc->status.pTableMap; - pDst->status.uidList = pSrc->status.uidList; - pDst->status.pfSetArray = pSrc->status.pfSetArray; - pDst->info.pSchema = pSrc->info.pSchema; - pDst->pSchemaMap = pSrc->pSchemaMap; - pDst->pReadSnap = pSrc->pReadSnap; - - if (pDst->info.pSchema) { - tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); - } -} - void tsdbReaderClose2(STsdbReader* pReader) { if (pReader == NULL) { return; @@ -4880,7 +4036,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbUninitReaderLock(pReader); SCostSummary* pCost = &pReader->cost; - SFilesetIter* pFilesetIter = &pReader->status.fileIter; + SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; tMergeTreeClose(&pLReader->mergeTree); @@ -4892,7 +4048,8 @@ void tsdbReaderClose2(STsdbReader* pReader) { tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-load-time:%.2f ms, " + " SMA-time:%.2f ms, fileBlocks:%" PRId64 + ", fileBlocks-load-time:%.2f ms, " "build in-memory-block-time:%.2f ms, lastBlocks:%" PRId64 ", lastBlocks-time:%.2f ms, composed-blocks:%" PRId64 ", composed-blocks-time:%.2fms, STableBlockScanInfo size:%.2f Kb, createTime:%.2f ms,createSkylineIterTime:%.2f " "ms, initLastBlockReader:%.2fms, %s", @@ -4932,7 +4089,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; - int32_t iter = 0; + int32_t iter = 0; while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; @@ -4955,7 +4112,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { } else { // resetDataBlockScanInfo excluding lastKey STableBlockScanInfo** p = NULL; - int32_t iter = 0; + int32_t iter = 0; while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p; @@ -5008,7 +4165,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { pReader->idStr); return code; - _err: +_err: tsdbError("failed to suspend data reader, code:%s %s", tstrerror(code), pReader->idStr); return code; } @@ -5080,7 +4237,7 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr); return code; - _err: +_err: tsdbError("failed to resume data reader, code:%s %s", tstrerror(code), pReader->idStr); return code; } @@ -5154,8 +4311,9 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { *hasNext = false; - if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT || pReader->code != TSDB_CODE_SUCCESS) { - return (pReader->code != TSDB_CODE_SUCCESS)? pReader->code:code; + if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->step == EXTERNAL_ROWS_NEXT || + pReader->code != TSDB_CODE_SUCCESS) { + return (pReader->code != TSDB_CODE_SUCCESS) ? pReader->code : code; } SReaderStatus* pStatus = &pReader->status; @@ -5286,7 +4444,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } } -int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool *hasNullSMA) { +int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; int32_t code = 0; @@ -5310,7 +4468,7 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, return TSDB_CODE_SUCCESS; } -// int64_t st = taosGetTimestampUs(); + // int64_t st = taosGetTimestampUs(); TARRAY2_CLEAR(&pSup->colAggArray, 0); code = tsdbDataFileReadBlockSma(pReader->pFileReader, &pFBlock->record, &pSup->colAggArray); @@ -5367,29 +4525,17 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, *pBlockSMA = pResBlock->pBlockAgg; pReader->cost.smaDataLoad += 1; -// double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; - pReader->cost.smaLoadTime += 0;//elapsedTime; + // double elapsedTime = (taosGetTimestampUs() - st) / 1000.0; + pReader->cost.smaLoadTime += 0; // elapsedTime; tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); return code; } -STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) { - STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid)); - if (p == NULL || *p == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - int32_t size = tSimpleHashGetSize(pTableMap); - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id); - return NULL; - } - - return *p; -} - static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { - SReaderStatus* pStatus = &pReader->status; - int32_t code = TSDB_CODE_SUCCESS; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + SReaderStatus* pStatus = &pReader->status; + int32_t code = TSDB_CODE_SUCCESS; + SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); if (pReader->code != TSDB_CODE_SUCCESS) { return NULL; @@ -5503,8 +4649,8 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDebug("%p reset reader, suid:%" PRIu64 ", numOfTables:%d, skey:%" PRId64 ", query range:%" PRId64 " - %" PRId64 " in query %s", - pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey, - pReader->idStr); + pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, + pReader->info.window.ekey, pReader->idStr); tsdbReleaseReader(pReader); @@ -5565,7 +4711,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT while (true) { if (hasNext) { SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - int32_t numOfRows = pBlockInfo->record.numRow; + int32_t numOfRows = pBlockInfo->record.numRow; pTableBlockInfo->totalRows += numOfRows; @@ -5741,7 +4887,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); - _exit: +_exit: if (code) { *ppSnap = NULL; if (pSnap) { -- GitLab