From eb2fb724b1f17b44e6ce59321ca09f781bf73d9a Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 25 Aug 2022 18:15:20 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 2 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMerge.c | 60 +++++++++++++++---- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 17 +++++- source/dnode/vnode/src/tsdb/tsdbUtil.c | 2 +- 6 files changed, 68 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 083bb6623f..f5d4af9381 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -266,7 +266,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMa int32_t tsdbReadBlockL(SDataFReader *pReader, int32_t iLast, SArray *aBlockL); int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData); -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData); +int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 4963754eed..6b388c6146 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -503,7 +503,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { if (code) goto _err; } case SFSLASTNEXTROW_BLOCKDATA: - code = tsdbReadLastBlock(state->pDataFReader, state->pBlockL, state->pBlockDataL); + code = tsdbReadLastBlock(state->pDataFReader, 0, state->pBlockL, state->pBlockDataL); if (code) goto _err; state->nRow = state->blockDataL.nRow; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 878491ef7e..b804ecf746 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -128,8 +128,8 @@ typedef struct { struct { SDataFReader *pReader; SArray *aBlockIdx; + SLDataIter aLDataiter[TSDB_MAX_LAST_FILE]; SDataMerger merger; - SArray *aBlockL[TSDB_MAX_LAST_FILE]; } dReader; struct { SDataFWriter *pWriter; @@ -140,6 +140,9 @@ typedef struct { } dWriter; } STsdbMerger; +extern int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, + SBlockData *pBlockData); // todo + static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { int32_t code = 0; STsdb *pTsdb = pMerger->pTsdb; @@ -151,9 +154,42 @@ static int32_t tsdbMergeFileDataStart(STsdbMerger *pMerger, SDFileSet *pSet) { code = tsdbReadBlockIdx(pMerger->dReader.pReader, pMerger->dReader.aBlockIdx); if (code) goto _err; + pMerger->dReader.merger.pNode = NULL; + pMerger->dReader.merger.rbt = tRBTreeCreate(tRowInfoCmprFn); for (int8_t iLast = 0; iLast < pSet->nLastF; iLast++) { - code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pMerger->dReader.aBlockL[iLast]); + SRBTreeNode *pNode = (SRBTreeNode *)taosMemoryCalloc(1, sizeof(*pNode) + sizeof(SLDataIter)); + if (pNode == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SLDataIter *pIter = (SLDataIter *)pNode->payload; + + pIter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pIter->aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataCreate(&pIter->bData); if (code) goto _err; + + code = tsdbReadBlockL(pMerger->dReader.pReader, iLast, pIter->aBlockL); + if (code) goto _err; + + if (taosArrayGetSize(pIter->aBlockL) == 0) continue; + pIter->iBlockL = 0; + + SBlockL *pBlockL = (SBlockL *)taosArrayGet(pIter->aBlockL, 0); + code = tsdbReadLastBlockEx(pMerger->dReader.pReader, iLast, pBlockL, &pIter->bData); + if (code) goto _err; + + pIter->iRow = 0; + pIter->rowInfo.suid = pIter->bData.suid; + pIter->rowInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[0]; + pIter->rowInfo.row = tsdbRowFromBlockData(&pIter->bData, 0); + + pNode = tRBTreePut(&pMerger->dReader.merger.rbt, pNode); + ASSERT(pNode); } // writer @@ -261,13 +297,13 @@ static int32_t tsdbStartMerge(STsdbMerger *pMerger, STsdb *pTsdb) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); - if (pMerger->dReader.aBlockL[iLast] == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } + // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { + // pMerger->dReader.aBlockL[iLast] = taosArrayInit(0, sizeof(SBlockL)); + // if (pMerger->dReader.aBlockL[iLast] == NULL) { + // code = TSDB_CODE_OUT_OF_MEMORY; + // goto _exit; + // } + // } // writer pMerger->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); @@ -305,9 +341,9 @@ static int32_t tsdbEndMerge(STsdbMerger *pMerger) { taosArrayDestroy(pMerger->dWriter.aBlockIdx); // reader - for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { - taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); - } + // for (int8_t iLast = 0; iLast < TSDB_MAX_LAST_FILE; iLast++) { + // taosArrayDestroy(pMerger->dReader.aBlockL[iLast]); + // } taosArrayDestroy(pMerger->dReader.aBlockIdx); tsdbFSDestroy(&pMerger->fs); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 7d270e08ca..326fc5e3f7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2391,7 +2391,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable return code; } - code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); + code = tsdbReadLastBlock(pReader->pFileReader, 0, pBlock, &pLastBlockReader->lastBlockData); double el = (taosGetTimestampUs() - st) / 1000.0; if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index e07b0e8f94..674e51c9d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -908,7 +908,7 @@ _err: return code; } -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData) { +int32_t tsdbReadLastBlock(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { int32_t code = 0; code = tsdbReadBlockDataImpl(pReader, &pBlockL->bInfo, 1, pBlockData); @@ -921,6 +921,21 @@ _err: return code; } +int32_t tsdbReadLastBlockEx(SDataFReader *pReader, int32_t iLast, SBlockL *pBlockL, SBlockData *pBlockData) { + int32_t code = 0; + + // read + code = tsdbReadAndCheck(pReader->aLastFD[iLast], pBlockL->bInfo.offset, &pReader->aBuf[1], pBlockL->bInfo.szBlock, 0); + if (code) goto _exit; + + // decmpr + code = tDecmprBlockData(pReader->aBuf[1], pBlockL->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); + if (code) goto _exit; + +_exit: + return code; +} + // SDataFWriter ==================================================== int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 49778542e3..31c4e39b37 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1619,7 +1619,7 @@ _exit: int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]) { int32_t code = 0; - tBlockDataClear(pBlockData); + tBlockDataReset(pBlockData); int32_t n = 0; SDiskDataHdr hdr = {0}; -- GitLab