From 33c3f34f3489732a57ce92c7a26abe3c6a5f7e55 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 8 Aug 2022 03:22:24 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 17 +- source/dnode/vnode/src/tsdb/tsdbCache.c | 14 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 16 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 167 ++---------------- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 14 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 4 +- 7 files changed, 49 insertions(+), 189 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e1bbc3278d..f33b8cd51d 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -137,8 +137,10 @@ int32_t tGetColData(uint8_t *p, SColData *pColData); #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) #define tBlockDataFirstKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataFirstRow(PBLOCKDATA)) #define tBlockDataLastKey(PBLOCKDATA) TSDBROW_KEY(&tBlockDataLastRow(PBLOCKDATA)) -int32_t tBlockDataInit(SBlockData *pBlockData); -void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear); + +int32_t tBlockDataCreate(SBlockData *pBlockData); +void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear); + void tBlockDataReset(SBlockData *pBlockData); int32_t tBlockDataSetSchema(SBlockData *pBlockData, STSchema *pTSchema, int64_t suid, int64_t uid); int32_t tBlockDataAddColData(SBlockData *pBlockData, int32_t iColData, SColData **ppColData); @@ -262,17 +264,6 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl int32_t nColId); int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, int16_t *aColId, int32_t nColId); - -#if 0 -int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol, - SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2); -int32_t tsdbReadBlockData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, SBlockData *pBlockData, - uint8_t **ppBuf1, uint8_t **ppBuf2); -int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2); -int32_t tsdbReadLastBlock(SDataFReader *pReader, SBlockL *pBlockL, SBlockData *pBlockData, uint8_t **ppBuf1, - uint8_t **ppBuf2); -#endif // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index c2bfad264b..2cb3f1f46d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -450,9 +450,9 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (--state->iFileSet >= 0) { pFileSet = (SDFileSet *)taosArrayGet(state->aDFileSet, state->iFileSet); } else { - // tBlockDataClear(&state->blockData, 1); + // tBlockDataDestroy(&state->blockData, 1); if (state->pBlockData) { - tBlockDataClear(state->pBlockData, 1); + tBlockDataDestroy(state->pBlockData, 1); state->pBlockData = NULL; } @@ -494,7 +494,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { if (!state->pBlockData) { state->pBlockData = &state->blockData; - tBlockDataInit(&state->blockData); + tBlockDataCreate(&state->blockData); } } case SFSNEXTROW_BLOCKDATA: @@ -552,8 +552,8 @@ _err: state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData, 1); - tBlockDataClear(state->pBlockData, 1); + // tBlockDataDestroy(&state->blockData, 1); + tBlockDataDestroy(state->pBlockData, 1); state->pBlockData = NULL; } @@ -579,8 +579,8 @@ int32_t clearNextRowFromFS(void *iter) { state->aBlockIdx = NULL; } if (state->pBlockData) { - // tBlockDataClear(&state->blockData, 1); - tBlockDataClear(state->pBlockData, 1); + // tBlockDataDestroy(&state->blockData, 1); + tBlockDataDestroy(state->pBlockData, 1); state->pBlockData = NULL; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index ad721ede7a..44f360d597 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1270,7 +1270,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } - code = tBlockDataInit(&pCommitter->dReader.bData); + code = tBlockDataCreate(&pCommitter->dReader.bData); if (code) goto _exit; pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL)); @@ -1279,7 +1279,7 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } - code = tBlockDataInit(&pCommitter->dReader.bDatal); + code = tBlockDataCreate(&pCommitter->dReader.bDatal); if (code) goto _exit; // Writer @@ -1295,10 +1295,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } - code = tBlockDataInit(&pCommitter->dWriter.bData); + code = tBlockDataCreate(&pCommitter->dWriter.bData); if (code) goto _exit; - code = tBlockDataInit(&pCommitter->dWriter.bDatal); + code = tBlockDataCreate(&pCommitter->dWriter.bDatal); if (code) goto _exit; _exit: @@ -1309,16 +1309,16 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { // Reader taosArrayDestroy(pCommitter->dReader.aBlockIdx); tMapDataClear(&pCommitter->dReader.mBlock); - tBlockDataClear(&pCommitter->dReader.bData, 1); + tBlockDataDestroy(&pCommitter->dReader.bData, 1); taosArrayDestroy(pCommitter->dReader.aBlockL); - tBlockDataClear(&pCommitter->dReader.bDatal, 1); + tBlockDataDestroy(&pCommitter->dReader.bDatal, 1); // Writer taosArrayDestroy(pCommitter->dWriter.aBlockIdx); taosArrayDestroy(pCommitter->dWriter.aBlockL); tMapDataClear(&pCommitter->dWriter.mBlock); - tBlockDataClear(&pCommitter->dWriter.bData, 1); - tBlockDataClear(&pCommitter->dWriter.bDatal, 1); + tBlockDataDestroy(&pCommitter->dWriter.bData, 1); + tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); tTSchemaDestroy(pCommitter->skmTable.pTSchema); tTSchemaDestroy(pCommitter->skmRow.pTSchema); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 0157f0ed22..cd1e46c342 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -438,7 +438,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; - code = tBlockDataInit(&pReader->status.fileBlockData); + code = tBlockDataCreate(&pReader->status.fileBlockData); if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _end; @@ -2670,7 +2670,7 @@ void tsdbReaderClose(STsdbReader* pReader) { } } taosMemoryFree(pSupInfo->buildBuf); - tBlockDataClear(&pReader->status.fileBlockData, true); + tBlockDataDestroy(&pReader->status.fileBlockData, true); cleanupDataBlockIterator(&pReader->status.blockIter); @@ -2874,7 +2874,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { tBlockDataClearData(&pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) { - tBlockDataClear(&pStatus->fileBlockData, 1); + tBlockDataDestroy(&pStatus->fileBlockData, 1); terrno = code; return NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 94d957eac7..69f39bb3c5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -846,63 +846,6 @@ _err: } #if 0 -static int32_t tsdbReadBlockDataKey(SBlockData *pBlockData, SBlockInfo *pSubBlock, uint8_t *pBuf, uint8_t **ppBuf) { - int32_t code = 0; -#if 0 - int64_t size = pSubBlock->szVersion + pSubBlock->szTSKEY + sizeof(TSCKSUM); - int64_t n; - - if (!taosCheckChecksumWhole(pBuf, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * pSubBlock->nRow); - if (code) goto _err; - code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow); - if (code) goto _err; - - if (pSubBlock->cmprAlg == NO_COMPRESSION) { - ASSERT(pSubBlock->szVersion == sizeof(int64_t) * pSubBlock->nRow); - ASSERT(pSubBlock->szTSKEY == sizeof(TSKEY) * pSubBlock->nRow); - - // VERSION - memcpy(pBlockData->aVersion, pBuf, pSubBlock->szVersion); - - // TSKEY - memcpy(pBlockData->aTSKEY, pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY); - } else { - size = sizeof(int64_t) * pSubBlock->nRow + COMP_OVERFLOW_BYTES; - if (pSubBlock->cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf, size); - if (code) goto _err; - } - - // VERSION - n = tsDecompressBigint(pBuf, pSubBlock->szVersion, pSubBlock->nRow, (char *)pBlockData->aVersion, - sizeof(int64_t) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, size); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - - // TSKEY - n = tsDecompressTimestamp(pBuf + pSubBlock->szVersion, pSubBlock->szTSKEY, pSubBlock->nRow, - (char *)pBlockData->aTSKEY, sizeof(TSKEY) * pSubBlock->nRow, pSubBlock->cmprAlg, *ppBuf, - size); - if (n < 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _err; - } - } - - return code; - -_err: -#endif - return code; -} - static int32_t tsdbReadColDataImpl(SBlockInfo *pSubBlock, SBlockCol *pBlockCol, SColData *pColData, uint8_t *pBuf, uint8_t **ppBuf) { int32_t code = 0; @@ -998,80 +941,6 @@ _err: return code; } -#if 0 -static int32_t tsdbReadBlockCol(uint8_t *pBuf, int32_t szBlockCol, SDiskDataHdr *pHdr, SArray *aBlockCol) { - int32_t code = 0; - int32_t n = 0; - SBlockCol blockCol; - SBlockCol *pBlockCol = &blockCol; - - // checksum - if (!taosCheckChecksumWhole(pBuf, szBlockCol + sizeof(TSCKSUM))) { - code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } - - // hdr - *pHdr = *(SDiskDataHdr *)pBuf; - n += sizeof(SDiskDataHdr); - - // aBlockCol - while (n < szBlockCol) { - n += tGetBlockCol(pBuf + n, pBlockCol); - - if (taosArrayPush(aBlockCol, pBlockCol) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - ASSERT(n == szBlockCol); - - return code; - -_err: - return code; -} -#endif - -static int32_t tsdbReadDataArray(uint8_t *pInput, int32_t szInput, int32_t nEle, int8_t type, int8_t cmprAlg, - uint8_t **ppOut, uint8_t **ppBuf) { - int32_t code = 0; - int32_t size; - - // size - if (IS_VAR_DATA_TYPE(type)) { - size = nEle; - } else { - size = tDataTypes[type].bytes * nEle; - } - - // alloc - code = tRealloc(ppOut, size); - if (code) goto _exit; - - // decode - if (cmprAlg == NO_COMPRESSION) { - ASSERT(szInput == size); - memcpy(*ppOut, pInput, size); - } else { - if (cmprAlg == TWO_STAGE_COMP) { - code = tRealloc(ppBuf, size + COMP_OVERFLOW_BYTES); - if (code) goto _exit; - - int32_t n = - tDataTypes[type].decompFunc(pInput, szInput, nEle, *ppOut, size, cmprAlg, *ppBuf, size + COMP_OVERFLOW_BYTES); - if (n <= 0) { - code = TSDB_CODE_COMPRESS_ERROR; - goto _exit; - } - } - } - -_exit: - return code; -} - static int32_t tsdbReadSubColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int32_t iSubBlock, int16_t *aColId, int32_t nCol, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2) { @@ -1210,29 +1079,29 @@ int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBl SBlockData *pBlockData1 = &(SBlockData){0}; SBlockData *pBlockData2 = &(SBlockData){0}; - tBlockDataInit(pBlockData1); - tBlockDataInit(pBlockData2); + tBlockDataCreate(pBlockData1); + tBlockDataCreate(pBlockData2); for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { code = tsdbReadSubColData(pReader, pBlockIdx, pBlock, iSubBlock, aColId, nCol, pBlockData1, ppBuf1, ppBuf2); if (code) goto _err; code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); goto _err; } code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); } tFree(pBuf1); @@ -1349,34 +1218,34 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl SBlockData *pBlockData1 = &(SBlockData){0}; SBlockData *pBlockData2 = &(SBlockData){0}; - tBlockDataInit(pBlockData1); - tBlockDataInit(pBlockData2); + tBlockDataCreate(pBlockData1); + tBlockDataCreate(pBlockData2); for (iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) { code = tsdbReadSubBlockData(pReader, pBlock, iSubBlock, pBlockData1, ppBuf1, ppBuf2); if (code) { - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); goto _err; } code = tBlockDataCopy(pBlockData, pBlockData2); if (code) { - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); goto _err; } // merge two block data code = tBlockDataMerge(pBlockData1, pBlockData2, pBlockData); if (code) { - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); goto _err; } } - tBlockDataClear(pBlockData1, 1); - tBlockDataClear(pBlockData2, 1); + tBlockDataDestroy(pBlockData1, 1); + tBlockDataDestroy(pBlockData2, 1); } ASSERT(pBlock->nRow == pBlockData->nRow); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 06a38303ec..973cd1e53a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -289,9 +289,9 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type goto _err; } pReader->mBlock = tMapDataInit(); - code = tBlockDataInit(&pReader->oBlockData); + code = tBlockDataCreate(&pReader->oBlockData); if (code) goto _err; - code = tBlockDataInit(&pReader->nBlockData); + code = tBlockDataCreate(&pReader->nBlockData); if (code) goto _err; pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); @@ -327,8 +327,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { } taosArrayDestroy(pReader->aBlockIdx); tMapDataClear(&pReader->mBlock); - tBlockDataClear(&pReader->oBlockData, 1); - tBlockDataClear(&pReader->nBlockData, 1); + tBlockDataDestroy(&pReader->oBlockData, 1); + tBlockDataDestroy(&pReader->nBlockData, 1); if (pReader->pDelFReader) { tsdbDelFReaderClose(&pReader->pDelFReader); @@ -1123,7 +1123,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->commitID = pTsdb->pVnode->state.commitID; // for data file - code = tBlockDataInit(&pWriter->bData); + code = tBlockDataCreate(&pWriter->bData); if (code) goto _err; pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); @@ -1131,7 +1131,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - code = tBlockDataInit(&pWriter->bDataR); + code = tBlockDataCreate(&pWriter->bDataR); if (code) goto _err; pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); @@ -1139,7 +1139,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - code = tBlockDataInit(&pWriter->bDataW); + code = tBlockDataCreate(&pWriter->bDataW); if (code) goto _err; // for del file diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 6a27b10cf1..039fe54f3d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1107,7 +1107,7 @@ static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { } // SBlockData ====================================================== -int32_t tBlockDataInit(SBlockData *pBlockData) { +int32_t tBlockDataCreate(SBlockData *pBlockData) { int32_t code = 0; pBlockData->suid = 0; @@ -1132,7 +1132,7 @@ _exit: return code; } -void tBlockDataClear(SBlockData *pBlockData, int8_t deepClear) { +void tBlockDataDestroy(SBlockData *pBlockData, int8_t deepClear) { tFree((uint8_t *)pBlockData->aUid); tFree((uint8_t *)pBlockData->aVersion); tFree((uint8_t *)pBlockData->aTSKEY); -- GitLab