From d630c2fdd75d8f714290d4291ae57699d68bc803 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 7 Sep 2022 18:56:45 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 99 +++++++++++++++------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 20328a6b90..92c5652faf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1122,6 +1122,68 @@ _exit: return code; } +static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + // TODO + return code; +} + +static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { + int32_t code = 0; + + if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code; + + if (pWriter->dReader.pBlockIdx && pWriter->dReader.pBlockIdx->suid == pWriter->id.suid && + pWriter->dReader.pBlockIdx->uid == pWriter->id.uid) { + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { + TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->id.uid); + if (code) goto _err; + + if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + } + } + + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + + for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); + + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); + if (code) goto _err; + } + + code = tsdbSnapNextTableData(pWriter); + if (code) goto _err; + } + + // code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + // pWriter->cmprAlg); + // if (code) goto _err; + + if (pWriter->dWriter.mDataBlk.nItem) { + SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid}; + code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); + if (code) goto _err; + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + return code; + +_err: + return code; +} + static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; @@ -1182,31 +1244,8 @@ static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { ASSERT(pWriter->dWriter.pWriter); - // todo: end current commit table - for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { - TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); - code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, pWriter->dReader.pBlockIdx->uid); - if (code) goto _err; - } - - for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { - SDataBlk dataBlk; - tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); - - code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); - if (code) goto _err; - } - - if (pWriter->dWriter.mDataBlk.nItem) { - SBlockIdx blockIdx = *pWriter->dReader.pBlockIdx; - code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); - if (code) goto _err; - - if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; // copy remain table data TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; @@ -1255,7 +1294,9 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { // End last table data write if need if (id.suid != pWriter->id.suid || id.uid != pWriter->id.uid) { - // TODO + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + pWriter->id.suid = 0; pWriter->id.uid = 0; } @@ -1267,9 +1308,8 @@ static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { if (code) goto _err; // Start new table data - pWriter->id.suid = id.suid; - pWriter->id.uid = id.uid; - tMapDataReset(&pWriter->dWriter.mDataBlk); + code = tsdbSnapWriteTableDataStart(pWriter, &id); + if (code) goto _err; } // Merge with .data file data @@ -1527,6 +1567,7 @@ _err: return code; } +// APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; STsdbSnapWriter* pWriter = NULL; -- GitLab