From 94b97c4da476577e05b0253b562a80aa2211e378 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 19 Aug 2022 21:42:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 61 +++++++++++++++++----- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 434fabfe9b..4724d9681a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -407,6 +407,7 @@ struct STsdbSnapWriter { int8_t cmprAlg; int64_t commitID; + uint8_t* aBuf[5]; // for data file SBlockData bData; @@ -420,6 +421,7 @@ struct STsdbSnapWriter { SBlockData* pBlockData; int32_t iRow; SBlockData bDataR; + SArray* aBlockL; // SArray SDataFWriter* pDataFWriter; SBlockIdx* pBlockIdxW; // NULL when no committing table @@ -429,6 +431,7 @@ struct STsdbSnapWriter { SMapData mBlockW; // SMapData SArray* aBlockIdxW; // SArray + SArray* aBlockLW; // SArray // for del file SDelFReader* pDelFReader; @@ -816,9 +819,11 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { if (pWriter->pDataFWriter == NULL) goto _exit; + // finish current table code = tsdbSnapWriteTableDataEnd(pWriter); if (code) goto _err; + // move remain table while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx)); if (code) goto _err; @@ -826,8 +831,16 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { pWriter->iBlockIdx++; } - code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); - if (code) goto _err; + // write remain stuff + if (taosArrayGetSize(pWriter->aBlockLW) > 0) { + code = tsdbWriteBlockL(pWriter->pDataFWriter, pWriter->aBlockIdxW); + if (code) goto _err; + } + + if (taosArrayGetSize(pWriter->aBlockIdx) > 0) { + code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); + if (code) goto _err; + } code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); if (code) goto _err; @@ -851,19 +864,22 @@ _err: } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); - int64_t n; + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); + int64_t n; // decode SBlockData* pBlockData = &pWriter->bData; - // n = tGetBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pBlockData); - // ASSERT(n + sizeof(SSnapDataHdr) + sizeof(TABLEID) == nData); + code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData, + pWriter->aBuf); + if (code) goto _err; // open file - TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); - TSDBKEY keyLast = tBlockDataLastKey(pBlockData); + TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; + TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1], + .ts = pBlockData->aTSKEY[pBlockData->nRow - 1]}; int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); @@ -882,9 +898,13 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (code) goto _err; + + code = tsdbReadBlockL(pWriter->pDataFReader, pWriter->aBlockL); + if (code) goto _err; } else { ASSERT(pWriter->pDataFReader == NULL); taosArrayClear(pWriter->aBlockIdx); + taosArrayClear(pWriter->aBlockL); } pWriter->iBlockIdx = 0; pWriter->pBlockIdx = NULL; @@ -921,6 +941,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 if (code) goto _err; taosArrayClear(pWriter->aBlockIdxW); + taosArrayClear(pWriter->aBlockLW); tMapDataReset(&pWriter->mBlockW); pWriter->pBlockIdxW = NULL; tBlockDataReset(&pWriter->bDataW); @@ -1110,6 +1131,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataR); if (code) goto _err; + pWriter->aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pWriter->aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); if (pWriter->aBlockIdxW == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1118,6 +1145,12 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataW); if (code) goto _err; + pWriter->aBlockLW = taosArrayInit(0, sizeof(SBlockL)); + if (pWriter->aBlockLW == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + // for del file pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); if (pWriter->aDelIdxR == NULL) { @@ -1147,8 +1180,7 @@ _err: } int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { - int32_t code = 0; -#if 0 + int32_t code = 0; STsdbSnapWriter* pWriter = *ppWriter; if (rollback) { @@ -1169,6 +1201,10 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { if (code) goto _err; } + for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { + tFree(pWriter->aBuf[iBuf]); + } + tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); taosMemoryFree(pWriter); *ppWriter = NULL; @@ -1179,7 +1215,6 @@ _err: pWriter->pTsdb->path, tstrerror(code)); taosMemoryFree(pWriter); *ppWriter = NULL; -#endif return code; } -- GitLab