From ebd60a968c464cdf64cd392bb398366c15453107 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 1 Aug 2022 10:03:00 +0000 Subject: [PATCH] refact: tsdb last file optimize 1 --- source/dnode/vnode/src/inc/tsdb.h | 8 ++ source/dnode/vnode/src/tsdb/tsdbCommit.c | 73 +++++++++--- source/dnode/vnode/src/tsdb/tsdbFile.c | 2 + .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 109 ++++++++++++++++++ source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 50 ++++++++ 6 files changed, 230 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index f1e980c026..8a6f473dc4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -113,6 +113,9 @@ int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); bool tBlockHasSma(SBlock *pBlock); +// SBlockL +int32_t tPutBlockL(uint8_t *p, void *ph); +int32_t tGetBlockL(uint8_t *p, void *ph); // SBlockIdx int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); @@ -225,6 +228,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); +int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg); @@ -233,6 +237,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf); +int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); @@ -416,6 +421,7 @@ struct SBlock { }; struct SBlockL { + int64_t suid; struct { int64_t uid; int64_t version; @@ -452,6 +458,7 @@ struct SColData { struct SBlockData { int32_t nRow; + int64_t *aUid; int64_t *aVersion; TSKEY *aTSKEY; SArray *aIdx; // SArray @@ -513,6 +520,7 @@ struct SHeadFile { int64_t commitID; int64_t size; int64_t offset; + int64_t loffset; }; struct SDataFile { diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index e6db812865..dc1aa42dc6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -39,14 +39,18 @@ typedef struct { struct { SDataFReader *pReader; SArray *aBlockIdx; // SArray + SArray *aBlockL; // SArray SMapData mBlock; // SMapData, read from reader SBlockData bData; + SBlockData bDatal; } dReader; struct { SDataFWriter *pWriter; SArray *aBlockIdx; // SArray + SArray *aBlockL; // SArray SMapData mBlock; // SMapData SBlockData bData; + SBlockData bDatal; } dWriter; SSkmInfo skmTable; SSkmInfo skmRow; @@ -279,10 +283,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { // memory pCommitter->nextKey = TSKEY_MAX; - // old - taosArrayClear(pCommitter->dReader.aBlockIdx); - tMapDataReset(&pCommitter->dReader.mBlock); - tBlockDataReset(&pCommitter->dReader.bData); + // Reader pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid}, tDFileSetCmprFn, TD_EQ); if (pRSet) { @@ -291,22 +292,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL); if (code) goto _err; + + code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL); + if (code) goto _err; + + } else { + pCommitter->dReader.pReader = NULL; + taosArrayClear(pCommitter->dReader.aBlockIdx); + taosArrayClear(pCommitter->dReader.aBlockL); } + tMapDataReset(&pCommitter->dReader.mBlock); + tBlockDataReset(&pCommitter->dReader.bData); + tBlockDataReset(&pCommitter->dReader.bDatal); - // new + // Writer SHeadFile fHead; SDataFile fData; SLastFile fLast; SSmaFile fSma; SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma}; - - taosArrayClear(pCommitter->dWriter.aBlockIdx); - tMapDataReset(&pCommitter->dWriter.mBlock); - tBlockDataReset(&pCommitter->dWriter.bData); if (pRSet) { wSet.diskId = pRSet->diskId; wSet.fid = pCommitter->commitFid; - fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0}; + fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0}; fData = *pRSet->pDataF; fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0}; fSma = *pRSet->pSmaF; @@ -319,7 +327,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { wSet.diskId = did; wSet.fid = pCommitter->commitFid; - fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0}; + fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0}; fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0}; fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0}; fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0}; @@ -327,6 +335,12 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; + taosArrayClear(pCommitter->dWriter.aBlockIdx); + taosArrayClear(pCommitter->dWriter.aBlockL); + tMapDataReset(&pCommitter->dWriter.mBlock); + tBlockDataReset(&pCommitter->dWriter.bData); + tBlockDataReset(&pCommitter->dWriter.bDatal); + _exit: return code; @@ -861,7 +875,11 @@ _err: static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { int32_t code = 0; - // write blockIdx + // write aBlockL + code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL); + if (code) goto _err; + + // write aBlockIdx code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL); if (code) goto _err; @@ -1001,14 +1019,15 @@ _err: static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { int32_t code = 0; + // Reader pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); if (pCommitter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pCommitter->dWriter.aBlockIdx == NULL) { + pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pCommitter->dReader.aBlockL == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -1016,20 +1035,46 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { code = tBlockDataInit(&pCommitter->dReader.bData); if (code) goto _exit; + code = tBlockDataInit(&pCommitter->dReader.bDatal); + if (code) goto _exit; + + // Writer + pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pCommitter->dWriter.aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pCommitter->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL)); + if (pCommitter->dWriter.aBlockL == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + code = tBlockDataInit(&pCommitter->dWriter.bData); if (code) goto _exit; + code = tBlockDataInit(&pCommitter->dWriter.bDatal); + if (code) goto _exit; + _exit: return code; } static void tsdbCommitDataEnd(SCommitter *pCommitter) { + // Reader taosArrayDestroy(pCommitter->dReader.aBlockIdx); + taosArrayDestroy(pCommitter->dReader.aBlockL); tMapDataClear(&pCommitter->dReader.mBlock); tBlockDataClear(&pCommitter->dReader.bData, 1); + tBlockDataClear(&pCommitter->dReader.bDatal, 1); + + // Writer taosArrayDestroy(pCommitter->dWriter.aBlockIdx); + taosArrayDestroy(pCommitter->dWriter.aBlockL); tMapDataClear(&pCommitter->dWriter.mBlock); tBlockDataClear(&pCommitter->dWriter.bData, 1); + tBlockDataClear(&pCommitter->dWriter.bDatal, 1); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 135ee23d44..f9f2c0df6b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -21,6 +21,7 @@ int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) { n += tPutI64v(p ? p + n : p, pHeadFile->commitID); n += tPutI64v(p ? p + n : p, pHeadFile->size); n += tPutI64v(p ? p + n : p, pHeadFile->offset); + n += tPutI64v(p ? p + n : p, pHeadFile->loffset); return n; } @@ -31,6 +32,7 @@ static int32_t tGetHeadFile(uint8_t *p, SHeadFile *pHeadFile) { n += tGetI64v(p + n, &pHeadFile->commitID); n += tGetI64v(p + n, &pHeadFile->size); n += tGetI64v(p + n, &pHeadFile->offset); + n += tGetI64v(p + n, &pHeadFile->loffset); return n; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 7365ac23b8..f33bb9e455 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -597,6 +597,68 @@ _err: return code; } +int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) { + int32_t code = 0; + int64_t offset = pReader->pSet->pHeadF->loffset; + int64_t size = pReader->pSet->pHeadF->offset - offset; + int64_t n; + uint32_t delimiter; + uint8_t *pBuf = NULL; + SBlockL blockl; + + if (!ppBuf) ppBuf = &pBuf; + + // alloc + code = tRealloc(ppBuf, size); + if (code) goto _err; + + // seek + if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // read + n = taosReadFile(pReader->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n < size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // check + if (!taosCheckChecksumWhole(*ppBuf, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + // decode + n = 0; + n = tGetU32(*ppBuf + n, &delimiter); + ASSERT(delimiter == TSDB_FILE_DLMT); + + taosArrayClear(aBlockL); + while (n < size - sizeof(TSCKSUM)) { + n += tGetBlockL(*ppBuf + n, &blockl); + + if (taosArrayPush(aBlockL, &blockl) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + ASSERT(n + sizeof(TSCKSUM) == size); + + tFree(pBuf); + return code; + +_err: + tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) { int32_t code = 0; int64_t offset = pBlockIdx->offset; @@ -1593,6 +1655,53 @@ _err: return code; } +int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) { + int32_t code = 0; + SHeadFile *pHeadFile = &pWriter->fHead; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; + + // size + size = sizeof(uint32_t); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { + size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL)); + } + size += sizeof(TSCKSUM); + + // alloc + if (!ppBuf) ppBuf = &pBuf; + code = tRealloc(ppBuf, size); + if (code) goto _err; + + // encode + n = 0; + n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) { + n += tPutBlockL(*ppBuf + n, taosArrayGet(aBlockL, iBlockL)); + } + taosCalcChecksumAppend(0, *ppBuf, size); + + ASSERT(n + sizeof(TSCKSUM) == size); + + // write + n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // update + pHeadFile->loffset = pHeadFile->size; + pHeadFile->size += size; + + return code; + +_err: + tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); + return code; +} + static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) { for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 6bb2b8c253..750a5dd164 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -911,14 +911,14 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 if (pSet) { wSet.diskId = pSet->diskId; wSet.fid = fid; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; + fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0}; fData = *pSet->pDataF; fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0}; fSma = *pSet->pSmaF; } else { wSet.diskId = (SDiskID){.level = 0, .id = 0}; wSet.fid = fid; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; + fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0}; fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0}; fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index d612e9bb10..4e0d1b2402 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -295,6 +295,56 @@ bool tBlockHasSma(SBlock *pBlock) { return pBlock->aSubBlock[0].nSma > 0; } +// SBlockL ====================================================== +int32_t tPutBlockL(uint8_t *p, void *ph) { + int32_t n = 0; + SBlockL *pBlockL = (SBlockL *)ph; + + n += tPutI64(p ? p + n : p, pBlockL->suid); + n += tPutI64(p ? p + n : p, pBlockL->minKey.uid); + n += tPutI64v(p ? p + n : p, pBlockL->minKey.version); + n += tPutI64(p ? p + n : p, pBlockL->minKey.ts); + n += tPutI64(p ? p + n : p, pBlockL->maxKey.uid); + n += tPutI64v(p ? p + n : p, pBlockL->maxKey.version); + n += tPutI64(p ? p + n : p, pBlockL->maxKey.ts); + n += tPutI64v(p ? p + n : p, pBlockL->minVer); + n += tPutI64v(p ? p + n : p, pBlockL->maxVer); + n += tPutI32v(p ? p + n : p, pBlockL->nRow); + n += tPutI8(p ? p + n : p, pBlockL->cmprAlg); + n += tPutI64v(p ? p + n : p, pBlockL->offset); + n += tPutI32v(p ? p + n : p, pBlockL->szBlock); + n += tPutI32v(p ? p + n : p, pBlockL->szBlockCol); + n += tPutI32v(p ? p + n : p, pBlockL->szUid); + n += tPutI32v(p ? p + n : p, pBlockL->szVer); + n += tPutI32v(p ? p + n : p, pBlockL->szTSKEY); + + return n; +} + +int32_t tGetBlockL(uint8_t *p, void *ph) { + int32_t n = 0; + SBlockL *pBlockL = (SBlockL *)ph; + + n += tGetI64(p + n, &pBlockL->suid); + n += tGetI64(p + n, &pBlockL->minKey.uid); + n += tGetI64v(p + n, &pBlockL->minKey.version); + n += tGetI64(p + n, &pBlockL->minKey.ts); + n += tGetI64(p + n, &pBlockL->maxKey.uid); + n += tGetI64v(p + n, &pBlockL->maxKey.version); + n += tGetI64(p + n, &pBlockL->maxKey.ts); + n += tGetI64v(p + n, &pBlockL->minVer); + n += tGetI64v(p + n, &pBlockL->maxVer); + n += tGetI32v(p + n, &pBlockL->nRow); + n += tGetI8(p + n, &pBlockL->cmprAlg); + n += tGetI64v(p + n, &pBlockL->offset); + n += tGetI32v(p + n, &pBlockL->szBlock); + n += tGetI32v(p + n, &pBlockL->szBlockCol); + n += tGetI32v(p + n, &pBlockL->szUid); + n += tGetI32v(p + n, &pBlockL->szVer); + n += tGetI32v(p + n, &pBlockL->szTSKEY); + + return n; +} // SBlockCol ====================================================== int32_t tPutBlockCol(uint8_t *p, void *ph) { -- GitLab