diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 38334dac0525c027f8f2acaad9ebcd1853d49b43..0831f3d75a64edebdaa9c2b5cc58f4d8e58bfdcc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#include "tsdb.h" #include "osDef.h" +#include "tsdb.h" #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) typedef enum { @@ -130,8 +130,8 @@ struct STsdbReader { SBlockLoadSuppInfo suppInfo; STsdbReadSnap* pReadSnap; SIOCostSummary cost; - STSchema* pSchema;// the newest version schema - STSchema* pMemSchema;// the previous schema for in-memory data, to avoid load schema too many times + STSchema* pSchema; // the newest version schema + STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times SDataFReader* pFileReader; SVersionRange verRange; @@ -1213,17 +1213,17 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* return code; } -static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, SFileBlockDumpInfo* pDumpInfo) { - +static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key, + SFileBlockDumpInfo* pDumpInfo) { // opt version // 1. it is not a border point // 2. the direct next point is not an duplicated timestamp if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && pReader->order == TSDB_ORDER_ASC) || (pDumpInfo->rowIndex > 0 && pReader->order == TSDB_ORDER_DESC)) { - int32_t step = pReader->order == TSDB_ORDER_ASC? 1:-1; + int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; - if (nextKey != key) { // merge is not needed + if (nextKey != key) { // merge is not needed doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); pDumpInfo->rowIndex += step; return true; @@ -1239,7 +1239,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader* pReader->pSchema = metaGetTbTSchema(pReader->pTsdb->pVnode->pMeta, uid, -1); } - if (sversion == pReader->pSchema->version) { + if (pReader->pSchema && sversion == pReader->pSchema->version) { return pReader->pSchema; } @@ -1265,10 +1265,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SBlockData* pBlockData = &pReader->status.fileBlockData; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - TSDBKEY k = TSDBROW_KEY(pRow); - TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); - SArray* pDelList = pBlockScanInfo->delSkyline; - bool freeTSRow = false; + TSDBKEY k = TSDBROW_KEY(pRow); + TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); + SArray* pDelList = pBlockScanInfo->delSkyline; + bool freeTSRow = false; uint64_t uid = pBlockScanInfo->uid; // ascending order traverse @@ -2153,7 +2153,7 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea } TSDBROW* pRow = tsdbTbDataIterGet(pIter->iter); - TSDBKEY key = {.ts = pRow->pTSRow->ts, .version = pRow->version}; + TSDBKEY key = {.ts = pRow->pTSRow->ts, .version = pRow->version}; if (outOfTimeWindow(key.ts, &pReader->window)) { pIter->hasVal = false; return NULL; @@ -2186,7 +2186,6 @@ TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pRea } } - int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, STsdbReader* pReader) { while (1) { @@ -2318,9 +2317,8 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow) { - TSDBROW* pNextRow = NULL; - TSDBROW current = *pRow; + TSDBROW current = *pRow; { // if the timestamp of the next valid row has a different ts, return current row directly pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -2350,6 +2348,10 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe // get the correct schema for data in memory STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid); + if (pReader->pSchema == NULL) { + pReader->pSchema = pTSchema; + } + tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema); STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid); @@ -2390,8 +2392,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo tRowMergerGetRow(&merge, pTSRow); } -int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, - int64_t endKey, bool* freeTSRow) { +int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, + bool* freeTSRow) { TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); SArray* pDelList = pBlockScanInfo->delSkyline; @@ -2446,7 +2448,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; - STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid); + STSchema* pSchema = doGetSchemaForTSRow(pTSRow->sver, pReader, uid); SColVal colVal = {0}; int32_t i = 0, j = 0; @@ -2532,7 +2534,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e do { STSRow* pTSRow = NULL; - bool freeTSRow = false; + bool freeTSRow = false; tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); if (pTSRow == NULL) { break; @@ -2581,9 +2583,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } -uint64_t getReaderMaxVersion(STsdbReader *pReader) { - return pReader->verRange.maxVer; -} +uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; } /** * @brief Get all suids since suid @@ -2761,7 +2761,8 @@ void tsdbReaderClose(STsdbReader* pReader) { SIOCostSummary* pCost = &pReader->cost; tsdbDebug("%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 - " SMA-time:%.2f ms, fileBlocks:%" PRId64 ", fileBlocks-time:%.2f ms, " + " SMA-time:%.2f ms, fileBlocks:%" PRId64 + ", fileBlocks-time:%.2f ms, " "build in-memory-block-time:%.2f ms, STableBlockScanInfo size:%.2f Kb %s", pReader, pCost->headFileLoad, pCost->headFileLoadTime, pCost->smaData, pCost->smaLoadTime, pCost->numOfBlocks, pCost->blockLoadTime, pCost->buildmemBlock, @@ -2769,7 +2770,9 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pReader->idStr); taosMemoryFree(pReader->pSchema); - taosMemoryFree(pReader->pMemSchema); + if (pReader->pMemSchema != pReader->pSchema) { + taosMemoryFree(pReader->pMemSchema); + } taosMemoryFreeClear(pReader); }