From 56985c521771325e7488aaafd6e7de83c069a2d1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 6 Sep 2022 17:41:06 +0800 Subject: [PATCH] snapshot read code --- source/dnode/vnode/src/inc/tsdb.h | 6 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 31 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 23 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 439 +++++++++++++----- 4 files changed, 361 insertions(+), 138 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index d45a6f19f0..2c53eb9e47 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -650,6 +650,12 @@ typedef struct SMergeTree { SLDataIter *pIter; } SMergeTree; +typedef struct { + int64_t suid; + int64_t uid; + STSchema *pTSchema; +} SSkmInfo; + int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index fb06203605..2f9e0ece22 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -14,13 +14,8 @@ */ #include "tsdb.h" -typedef struct { - int64_t suid; - int64_t uid; - STSchema *pTSchema; -} SSkmInfo; -typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT; +typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; typedef struct { SRBTreeNode n; @@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); static int32_t tsdbNextCommitRow(SCommitter *pCommitter); -static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { +int32_t tRowInfoCmprFn(const void *p1, const void *p2) { SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo2 = (SRowInfo *)p2; @@ -325,22 +320,22 @@ _err: return code; } -static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) { +int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) { int32_t code = 0; if (suid) { - if (pCommitter->skmTable.suid == suid) { - pCommitter->skmTable.uid = uid; + if (pSkmInfo->suid == suid) { + pSkmInfo->uid = uid; goto _exit; } } else { - if (pCommitter->skmTable.uid == uid) goto _exit; + if (pSkmInfo->uid == uid) goto _exit; } - pCommitter->skmTable.suid = suid; - pCommitter->skmTable.uid = uid; - tTSchemaDestroy(pCommitter->skmTable.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema); + pSkmInfo->suid = suid; + pSkmInfo->uid = uid; + tTSchemaDestroy(pSkmInfo->pTSchema); + code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); if (code) goto _exit; _exit: @@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { int8_t iIter = 0; for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { pIter = &pCommitter->aDataIter[iIter]; - pIter->type = LAST_DATA_ITER; + pIter->type = STT_DATA_ITER; pIter->iStt = iStt; code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); @@ -1046,7 +1041,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { break; } } - } else if (pCommitter->pIter->type == LAST_DATA_ITER) { // last file + } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file pIter->iRow++; if (pIter->iRow < pIter->bData.nRow) { pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; @@ -1437,7 +1432,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { tMapDataReset(&pCommitter->dWriter.mBlock); // impl - code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); + code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); if (code) goto _err; code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); if (code) goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 128bfe37da..b80b9fd2f6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1053,6 +1053,29 @@ _err: return code; } +int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { + int32_t code = 0; + SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0]; + + // alloc + code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock); + if (code) goto _err; + + // read + code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock); + if (code) goto _err; + + // decmpr + code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 9fc5639c5e..5b119a62b7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -16,6 +16,29 @@ #include "tsdb.h" // STsdbSnapReader ======================================== +typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT; +typedef struct { + SRBTreeNode n; + SRowInfo rInfo; + EFIterT type; + union { + struct { + SArray* aBlockIdx; + int32_t iBlockIdx; + SBlockIdx* pBlockIdx; + SMapData mBlock; + int32_t iBlock; + }; // .data file + struct { + int32_t iStt; + SArray* aSttBlk; + int32_t iSttBlk; + }; // .stt file + }; + SBlockData bData; + int32_t iRow; +} SFDataIter; + struct STsdbSnapReader { STsdb* pTsdb; int64_t sver; @@ -26,146 +49,301 @@ struct STsdbSnapReader { int8_t dataDone; int32_t fid; SDataFReader* pDataFReader; - SArray* aBlockIdx; // SArray - SArray* aSstBlk; // SArray - SBlockIdx* pBlockIdx; - SSttBlk* pSstBlk; - - int32_t iBlockIdx; - int32_t iBlockL; - SMapData mBlock; // SMapData - int32_t iBlock; - SBlockData oBlockData; - SBlockData nBlockData; + SFDataIter* pIter; + SRBTree rbt; + SFDataIter aFDataIter[TSDB_MAX_STT_FILE + 1]; + SBlockData bData; + SSkmInfo skmTable; // for del file int8_t delDone; SDelFReader* pDelFReader; SArray* aDelIdx; // SArray int32_t iDelIdx; SArray* aDelData; // SArray + uint8_t* aBuf[5]; }; -static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { +extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); +extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); +extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); + +static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { int32_t code = 0; - STsdb* pTsdb = pReader->pTsdb; - while (true) { - if (pReader->pDataFReader == NULL) { - // next - SDFileSet dFileSet = {.fid = pReader->fid}; - SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); - if (pSet == NULL) goto _exit; - pReader->fid = pSet->fid; - - // load - code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet); - if (code) goto _err; + SDFileSet dFileSet = {.fid = pReader->fid}; + SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); + if (pSet == NULL) return code; - code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); - if (code) goto _err; + pReader->fid = pSet->fid; + code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); + if (code) goto _err; + + pReader->pIter = NULL; + tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn); + + // .data file + SFDataIter* pIter = &pReader->aFDataIter[0]; + pIter->type = SNAP_DATA_FILE_ITER; + + code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx); + if (code) goto _err; + + for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) { + pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); + + code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + if (code) goto _err; - code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk); + for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); + + if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; + + code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); if (code) goto _err; - // init - pReader->iBlockIdx = 0; - if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { - pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); + ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid); + ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid); - code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); - if (code) goto _err; + for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; - pReader->iBlock = 0; - } else { - pReader->pBlockIdx = NULL; + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.suid = pIter->pBlockIdx->suid; + pIter->rInfo.uid = pIter->pBlockIdx->uid; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _add_iter_and_break; + } } + } - pReader->iBlockL = 0; - while (true) { - if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) { - pReader->pSstBlk = NULL; - break; - } + continue; - pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); - if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { - // TODO - break; - } + _add_iter_and_break: + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); + break; + } - pReader->iBlockL++; - } + // .stt file + pIter = &pReader->aFDataIter[1]; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + pIter->type = SNAP_STT_FILE_ITER; + pIter->iStt = iStt; + + code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk); + if (code) goto _err; + + for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { + SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + + if (pSttBlk->minVer > pReader->ever) continue; + if (pSttBlk->maxVer < pReader->sver) continue; + + code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); + if (code) goto _err; + + for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; - tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path, - pReader->fid); + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.suid = pIter->bData.suid; + pIter->rInfo.uid = pIter->bData.uid; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _add_iter; + } + } } - while (true) { - if (pReader->pBlockIdx && pReader->pSstBlk) { - TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid}; + continue; - ASSERT(0); + _add_iter: + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); + pIter++; + } - // if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) { - // // TODO - // } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) { - // // TODO - // } else { - // // TODO - // } - } else if (pReader->pBlockIdx) { - while (pReader->iBlock < pReader->mBlock.nItem) { - SDataBlk block; - tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk); + tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode), + pReader->pTsdb->path, pReader->fid); + return code; - if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { - // load data (todo) - } +_err: + tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode), + tstrerror(code)); + return code; +} + +static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; } - // next - pReader->iBlock++; - if (*ppData) break; +static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { + int32_t code = 0; + + if (pReader->pIter) { + SFDataIter* pIter = pReader->pIter; + + while (true) { + _find_row: + for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; + + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _out; } + } + + if (pIter->type == SNAP_DATA_FILE_ITER) { + while (true) { + for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); - if (pReader->iBlock >= pReader->mBlock.nItem) { - pReader->iBlockIdx++; - if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { - pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); + if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; - code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); + code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); if (code) goto _err; - pReader->iBlock = 0; - } else { - pReader->pBlockIdx = NULL; + pIter->iRow = -1; + goto _find_row; } + + pIter->iBlockIdx++; + if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break; + + pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); + code = tsdbReadBlock(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + if (code) goto _err; + pIter->iBlock = -1; } - if (*ppData) goto _exit; - } else if (pReader->pSstBlk) { - while (pReader->pSstBlk) { - if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { - // load data (todo) - } + pReader->pIter = NULL; + } else if (pIter->type == SNAP_STT_FILE_ITER) { + for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { + SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); - // next - pReader->iBlockL++; - if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { - pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk); - } else { - pReader->pSstBlk = NULL; - } + if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue; - if (*ppData) goto _exit; + code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData); + if (code) goto _err; + + pIter->iRow = -1; + goto _find_row; } + + pReader->pIter = NULL; } else { + ASSERT(0); + } + } + + _out: + pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); + if (pReader->pIter && pIter) { + int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo); + if (c > 0) { + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter); + pReader->pIter = NULL; + } else { + ASSERT(c); + } + } + } + + if (pReader->pIter == NULL) { + pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); + if (pReader->pIter) { + tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter); + } + } + + return code; + +_err: + return code; +} + +static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + + ASSERT(pReader->bData.nRow); + + int32_t aBufN[5] = {0}; + code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN); + if (code) goto _exit; + + int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData; + pHdr->type = SNAP_DATA_TSDB; + pHdr->size = size; + + memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]); + memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]); + if (aBufN[1]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]); + } + if (aBufN[0]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]); + } + +_exit: + return code; +} + +static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + STsdb* pTsdb = pReader->pTsdb; + + while (true) { + if (pReader->pDataFReader == NULL) { + code = tsdbSnapReadOpenFile(pReader); + if (code) goto _err; + } + + if (pReader->pDataFReader == NULL) break; + + SRowInfo* pRowInfo = tsdbSnapGetRow(pReader); + if (pRowInfo == NULL) { + tsdbDataFReaderClose(&pReader->pDataFReader); + continue; + } + + TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; + SBlockData* pBlockData = &pReader->bData; + + code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); + if (code) goto _err; + + code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema); + if (code) goto _err; + + while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid); + if (code) goto _err; + + code = tsdbSnapNextRow(pReader); + if (code) goto _err; + + pRowInfo = tsdbSnapGetRow(pReader); + if (pRowInfo == NULL) { tsdbDataFReaderClose(&pReader->pDataFReader); break; } + + if (pBlockData->nRow >= 4096) break; } + + code = tsdbSnapCmprData(pReader, ppData); + if (code) goto _err; + + break; } -_exit: return code; _err: @@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { size += tPutDelData(NULL, pDelData); } } - if (size == 0) continue; // org data @@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type goto _err; } + // data pReader->fid = INT32_MIN; - pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pReader->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pReader->aSstBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { + SFDataIter* pIter = &pReader->aFDataIter[iIter]; + + if (iIter == 0) { + pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pIter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } else { + pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pIter->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + code = tBlockDataCreate(&pIter->bData); + if (code) goto _err; } - pReader->mBlock = tMapDataInit(); - code = tBlockDataCreate(&pReader->oBlockData); - if (code) goto _err; - code = tBlockDataCreate(&pReader->nBlockData); + + code = tBlockDataCreate(&pReader->bData); if (code) goto _err; + // del pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); if (pReader->aDelIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { int32_t code = 0; STsdbSnapReader* pReader = *ppReader; - if (pReader->pDataFReader) { - tsdbDataFReaderClose(&pReader->pDataFReader); - } - taosArrayDestroy(pReader->aSstBlk); - taosArrayDestroy(pReader->aBlockIdx); - tMapDataClear(&pReader->mBlock); - tBlockDataDestroy(&pReader->oBlockData, 1); - tBlockDataDestroy(&pReader->nBlockData, 1); - - if (pReader->pDelFReader) { - tsdbDelFReaderClose(&pReader->pDelFReader); + // data + if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader); + for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { + SFDataIter* pIter = &pReader->aFDataIter[iIter]; + + if (iIter == 0) { + taosArrayDestroy(pIter->aBlockIdx); + tMapDataClear(&pIter->mBlock); + } else { + taosArrayDestroy(pIter->aSttBlk); + } + + tBlockDataDestroy(&pIter->bData, 1); } + + tBlockDataDestroy(&pReader->bData, 1); + tTSchemaDestroy(pReader->skmTable.pTSchema); + + // del + if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader); taosArrayDestroy(pReader->aDelIdx); taosArrayDestroy(pReader->aDelData); @@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); + for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) { + tFree(pReader->aBuf[iBuf]); + } + taosMemoryFree(pReader); *ppReader = NULL; return code; -- GitLab