From 43e78919452ca910a87077297418356ce64f7caf Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 9 Jun 2022 05:54:32 +0000 Subject: [PATCH] refact --- source/dnode/vnode/src/tsdb/tsdbCommit.c | 137 ++++++++++++----------- 1 file changed, 69 insertions(+), 68 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 17b8afda4b..a55c05e274 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -41,7 +41,7 @@ typedef struct { SArray *aSupBlk; // Table super-block array SArray *aSubBlk; // table sub-block array SDataCols *pDataCols; -} SCommitH; +} SCommitter; #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) @@ -60,42 +60,11 @@ typedef struct { #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) -static int32_t tsdbCommitData(SCommitH *pCommith); -static int32_t tsdbCommitDel(SCommitH *pCommith); -static int32_t tsdbCommitCache(SCommitH *pCommith); -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle); -static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno); - -static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); -static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); -static int tsdbNextCommitFid(SCommitH *pCommith); -static void tsdbDestroyCommitH(SCommitH *pCommith); -static int32_t tsdbCreateCommitIters(SCommitH *pCommith); -static void tsdbDestroyCommitIters(SCommitH *pCommith); -static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static int tsdbCommitToTable(SCommitH *pCommith, int tid); -static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx); -static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx); -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -static int tsdbWriteBlockInfo(SCommitH *pCommih); -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); -static int tsdbMoveBlock(SCommitH *pCommith, int bidx); -static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, - bool isLastOneBlock); -static void tsdbResetCommitTable(SCommitH *pCommith); -static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, - SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); -static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); -static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, - SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, - SMergeInfo *pMergeInfo); +static int32_t tsdbCommitData(SCommitter *pCommith); +static int32_t tsdbCommitDel(SCommitter *pCommith); +static int32_t tsdbCommitCache(SCommitter *pCommith); +static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle); +static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno); int32_t tsdbBegin(STsdb *pTsdb) { if (!pTsdb) return 0; @@ -111,7 +80,7 @@ int32_t tsdbBegin(STsdb *pTsdb) { int32_t tsdbCommit(STsdb *pTsdb) { int32_t code = 0; - SCommitH commith = {0}; + SCommitter commith = {0}; SDFileSet *pSet = NULL; int fid; @@ -154,7 +123,39 @@ _err: return code; } -static int32_t tsdbCommitData(SCommitH *pCommith) { +// STATIC METHODS ========================================================================================= +static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo); +static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key); +static int tsdbNextCommitFid(SCommitter *pCommith); +static void tsdbDestroyCommitH(SCommitter *pCommith); +static int32_t tsdbCreateCommitIters(SCommitter *pCommith); +static void tsdbDestroyCommitIters(SCommitter *pCommith); +static int tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid); +static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid); +static int tsdbCommitToTable(SCommitter *pCommith, int tid); +static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx); +static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx); +static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +static int tsdbWriteBlockInfo(SCommitter *pCommih); +static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitter *pCommith, int bidx); +static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); +static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock); +static void tsdbResetCommitTable(SCommitter *pCommith); +static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError); +static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, + SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); +static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); +static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead, + SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, + SMergeInfo *pMergeInfo); + +static int32_t tsdbCommitData(SCommitter *pCommith) { int32_t fid; SDFileSet *pSet = NULL; int32_t code = 0; @@ -214,13 +215,13 @@ static int32_t tsdbCommitData(SCommitH *pCommith) { return code; } -static int32_t tsdbCommitDel(SCommitH *pCommith) { +static int32_t tsdbCommitDel(SCommitter *pCommith) { int32_t code = 0; // TODO return code; } -static int32_t tsdbCommitCache(SCommitH *pCommith) { +static int32_t tsdbCommitCache(SCommitter *pCommith) { int32_t code = 0; // TODO return code; @@ -284,7 +285,7 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { pRtn->minFid, pRtn->midFid, pRtn->maxFid); } -static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle) { +static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) { int32_t code = 0; tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb)); @@ -298,7 +299,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle) { return code; } -static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno) { +static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno) { int32_t code = 0; STsdb *pTsdb = TSDB_COMMIT_REPO(pCHandle); @@ -312,7 +313,7 @@ static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno) { return code; } -static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { +static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pCommith, 0, sizeof(*pCommith)); @@ -365,7 +366,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { } // Skip all keys until key (not included) -static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { +static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key) { for (int i = 0; i < pCommith->niters; i++) { SCommitIter *pIter = pCommith->iters + i; if (pIter->pTable == NULL || pIter->pIter == NULL) continue; @@ -375,7 +376,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { } } -static int tsdbNextCommitFid(SCommitH *pCommith) { +static int tsdbNextCommitFid(SCommitter *pCommith) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); int fid = TSDB_IVLD_FID; @@ -398,7 +399,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) { return fid; } -static void tsdbDestroyCommitH(SCommitH *pCommith) { +static void tsdbDestroyCommitH(SCommitter *pCommith) { pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols); pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk); pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk); @@ -408,7 +409,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_t fid) { +static int32_t tsdbCommitToFileStart(SCommitter *pCHandle, SDFileSet *pSet, int32_t fid) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCHandle); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); @@ -428,12 +429,12 @@ static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_ return code; } -static int32_t tsdbCommitToFileImpl(SCommitH *pCHandle) { +static int32_t tsdbCommitToFileImpl(SCommitter *pCHandle) { int32_t code = 0; // TODO return code; } -static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) { +static int32_t tsdbCommitToFileEnd(SCommitter *pCommith) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); @@ -464,7 +465,7 @@ static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) { return code; } -static int32_t tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { +static int32_t tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo); @@ -531,7 +532,7 @@ _err: return code; } -static int32_t tsdbCreateCommitIters(SCommitH *pCommith) { +static int32_t tsdbCreateCommitIters(SCommitter *pCommith) { int32_t code = 0; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SMemTable *pMem = pRepo->imem; @@ -568,7 +569,7 @@ _err: return code; } -static void tsdbDestroyCommitIters(SCommitH *pCommith) { +static void tsdbDestroyCommitIters(SCommitter *pCommith) { if (pCommith->iters == NULL) return; for (int i = 1; i < pCommith->niters; i++) { @@ -585,7 +586,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { pCommith->niters = 0; } -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { +static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid) { SDiskID did; STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); @@ -884,7 +885,7 @@ static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { } // =================== Commit Time-Series Data -static int tsdbCommitToTable(SCommitH *pCommith, int tid) { +static int tsdbCommitToTable(SCommitter *pCommith, int tid) { SCommitIter *pIter = pCommith->iters + tid; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); @@ -973,7 +974,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { return 0; } -static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { +static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx) { SReadH *pReadh = &pCommith->readh; STsdb *pTsdb = TSDB_READ_REPO(pReadh); STSchema *pTSchema = NULL; @@ -1027,7 +1028,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) { return 0; } -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { +static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable) { STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1); pCommith->pTable = pTable; @@ -1295,7 +1296,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi return 0; } -static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, +static int tsdbWriteBlock(SCommitter *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols, @@ -1303,7 +1304,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); } -static int tsdbWriteBlockInfo(SCommitH *pCommih) { +static int tsdbWriteBlockInfo(SCommitter *pCommih) { SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); SBlockIdx blkIdx; STable *pTable = TSDB_COMMIT_TABLE(pCommih); @@ -1325,7 +1326,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { return 0; } -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { +static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); SMergeInfo mInfo; @@ -1358,7 +1359,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi return 0; } -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { +static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; @@ -1430,7 +1431,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { return 0; } -static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) { +static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; if (pBlock->last) { return pCommith->isLFileSame; @@ -1438,7 +1439,7 @@ static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) { return pCommith->isDFileSame; } -static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { +static int tsdbMoveBlock(SCommitter *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; SDFile *pDFile; SBlock block; @@ -1477,7 +1478,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { return 0; } -static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { +static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -1491,7 +1492,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const return 0; } -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, +static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); @@ -1628,13 +1629,13 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i } } -static void tsdbResetCommitTable(SCommitH *pCommith) { +static void tsdbResetCommitTable(SCommitter *pCommith) { taosArrayClear(pCommith->aSubBlk); taosArrayClear(pCommith->aSupBlk); pCommith->pTable = NULL; } -static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { +static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); } @@ -1645,7 +1646,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { +static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg *pCfg = REPO_CFG(pRepo); int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; -- GitLab