From bc600a4942e5a82cad3d2749f613838f9315b33a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 3 Aug 2022 14:08:03 +0800 Subject: [PATCH] refactor: do some internal refactor optimize the building block performance. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 74 +++++++++++++++++++------- 1 file changed, 56 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6d1e3a4e07..62d7a72b8f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -152,7 +152,7 @@ static void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, - STsdbReader* pReader); + STsdbReader* pReader, bool* freeTSRow); static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, @@ -1235,6 +1235,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* TSDBKEY k = TSDBROW_KEY(pRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); SArray* pDelList = pBlockScanInfo->delSkyline; + bool freeTSRow = false; // ascending order traverse if (ASCENDING_TRAVERSE(pReader->order)) { @@ -1248,7 +1249,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* tRowMergerGetRow(&merge, &pTSRow); } } else if (k.ts < key) { // k.ts < key - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); } else { // k.ts == key, ascending order: file block ----> imem rows -----> mem rows tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); @@ -1260,7 +1261,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { // descending order scan if (key < k.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, pIter, pDelList, &pTSRow, pReader, &freeTSRow); } else if (k.ts < key) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { return TSDB_CODE_SUCCESS; @@ -1302,12 +1303,12 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ASSERT(pRow != NULL && piRow != NULL); int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; + bool freeTSRow = false; uint64_t uid = pBlockScanInfo->uid; TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); - if (ASCENDING_TRAVERSE(pReader->order)) { // [1&2] key <= [k.ts && ik.ts] if (key <= k.ts && key <= ik.ts) { @@ -1335,16 +1336,22 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts < key <= k.ts // [4] ik.ts < k.ts <= key if (ik.ts < k.ts) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + if (freeTSRow) { + taosMemoryFree(pTSRow); + } return TSDB_CODE_SUCCESS; } // [5] k.ts < key <= ik.ts // [6] k.ts < ik.ts <= key if (k.ts < ik.ts) { - doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader); + doMergeMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + if (freeTSRow) { + taosMemoryFree(pTSRow); + } return TSDB_CODE_SUCCESS; } @@ -1354,6 +1361,7 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; } } @@ -1385,8 +1393,11 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* // [3] ik.ts > k.ts >= Key // [4] ik.ts > key >= k.ts if (ik.ts > key) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + if (freeTSRow) { + taosMemoryFree(pTSRow); + } return TSDB_CODE_SUCCESS; } @@ -1399,18 +1410,21 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; } //[7] key = ik.ts > k.ts if (key == ik.ts) { - doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader); + doMergeMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, &pTSRow, pReader, &freeTSRow); TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex); tRowMerge(&merge, &fRow); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); tRowMergerGetRow(&merge, &pTSRow); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow); + + taosMemoryFree(pTSRow); return TSDB_CODE_SUCCESS; } } @@ -2280,11 +2294,29 @@ void updateSchema(TSDBROW* pRow, uint64_t uid, STsdbReader* pReader) { } void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, - STsdbReader* pReader) { + STsdbReader* pReader, bool* freeTSRow) { + { // if the timestamp of the next valid row has a different ts, return current row directly + TSDBROW current = *pRow; + pIter->hasVal = tsdbTbDataIterNext(pIter->iter); + + if (!pIter->hasVal) { + *pTSRow = current.pTSRow; + *freeTSRow = false; + return; + } else { // has next point in mem/imem + TSDBROW* pNextRow = getValidRow(pIter, pDelList, pReader); + if (TSDBROW_KEY(¤t).ts != TSDBROW_KEY(pNextRow).ts) { + *pTSRow = current.pTSRow; + *freeTSRow = false; + return; + } + } + } + SRowMerger merge = {0}; + TSDBKEY k = TSDBROW_KEY(pRow); - TSDBKEY k = TSDBROW_KEY(pRow); - // updateSchema(pRow, uid, pReader); + // get the correct schema for data in memory int32_t sversion = TSDBROW_SVERSION(pRow); STSchema* pTSchema = NULL; if (sversion != pReader->pSchema->version) { @@ -2298,6 +2330,8 @@ void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDe tRowMergerGetRow(&merge, pTSRow); tRowMergerClear(&merge); + *freeTSRow = true; + if (sversion != pReader->pSchema->version) { taosMemoryFree(pTSchema); } @@ -2332,7 +2366,7 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo } int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, - int64_t endKey) { + int64_t endKey, bool* freeTSRow) { TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); SArray* pDelList = pBlockScanInfo->delSkyline; @@ -2358,23 +2392,24 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY ik = TSDBROW_KEY(piRow); if (ik.ts < k.ts) { // ik.ts < k.ts - doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); + doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } else if (k.ts < ik.ts) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } else { // ik.ts == k.ts doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); + *freeTSRow = true; } return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader); + doMergeMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); return TSDB_CODE_SUCCESS; } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader); + doMergeMultiRows(piRow, pBlockScanInfo->uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); return TSDB_CODE_SUCCESS; } @@ -2472,13 +2507,16 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e do { STSRow* pTSRow = NULL; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey); + bool freeTSRow = false; + tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow); if (pTSRow == NULL) { break; } doAppendRowFromTSRow(pBlock, pReader, pTSRow); - taosMemoryFree(pTSRow); + if (freeTSRow) { + taosMemoryFree(pTSRow); + } // no data in buffer, return immediately if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { -- GitLab