提交 43e78919 编写于 作者: H Hongze Cheng

refact

上级 fe12c092
......@@ -41,7 +41,7 @@ typedef struct {
SArray *aSupBlk; // Table super-block array
SArray *aSubBlk; // table sub-block array
SDataCols *pDataCols;
} SCommitH;
} SCommitter;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
......@@ -60,42 +60,11 @@ typedef struct {
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->pVnode->config.tsdbCfg.maxRows)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static int32_t tsdbCommitData(SCommitH *pCommith);
static int32_t tsdbCommitDel(SCommitH *pCommith);
static int32_t tsdbCommitCache(SCommitH *pCommith);
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle);
static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno);
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitH *pCommith);
static void tsdbDestroyCommitH(SCommitH *pCommith);
static int32_t tsdbCreateCommitIters(SCommitH *pCommith);
static void tsdbDestroyCommitIters(SCommitH *pCommith);
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitH *pCommih);
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitH *pCommith);
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
SMergeInfo *pMergeInfo);
static int32_t tsdbCommitData(SCommitter *pCommith);
static int32_t tsdbCommitDel(SCommitter *pCommith);
static int32_t tsdbCommitCache(SCommitter *pCommith);
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle);
static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno);
int32_t tsdbBegin(STsdb *pTsdb) {
if (!pTsdb) return 0;
......@@ -111,7 +80,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
int32_t tsdbCommit(STsdb *pTsdb) {
int32_t code = 0;
SCommitH commith = {0};
SCommitter commith = {0};
SDFileSet *pSet = NULL;
int fid;
......@@ -154,7 +123,39 @@ _err:
return code;
}
static int32_t tsdbCommitData(SCommitH *pCommith) {
// STATIC METHODS =========================================================================================
static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo);
static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key);
static int tsdbNextCommitFid(SCommitter *pCommith);
static void tsdbDestroyCommitH(SCommitter *pCommith);
static int32_t tsdbCreateCommitIters(SCommitter *pCommith);
static void tsdbDestroyCommitIters(SCommitter *pCommith);
static int tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid);
static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid);
static int tsdbCommitToTable(SCommitter *pCommith, int tid);
static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx);
static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx);
static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable);
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
static int tsdbWriteBlockInfo(SCommitter *pCommih);
static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx);
static int tsdbMoveBlock(SCommitter *pCommith, int bidx);
static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks);
static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock);
static void tsdbResetCommitTable(SCommitter *pCommith);
static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError);
static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo);
static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter,
SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update);
static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
static int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn);
static int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, STbDataIter *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup,
SMergeInfo *pMergeInfo);
static int32_t tsdbCommitData(SCommitter *pCommith) {
int32_t fid;
SDFileSet *pSet = NULL;
int32_t code = 0;
......@@ -214,13 +215,13 @@ static int32_t tsdbCommitData(SCommitH *pCommith) {
return code;
}
static int32_t tsdbCommitDel(SCommitH *pCommith) {
static int32_t tsdbCommitDel(SCommitter *pCommith) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitCache(SCommitH *pCommith) {
static int32_t tsdbCommitCache(SCommitter *pCommith) {
int32_t code = 0;
// TODO
return code;
......@@ -284,7 +285,7 @@ void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) {
pRtn->minFid, pRtn->midFid, pRtn->maxFid);
}
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle) {
static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCHandle) {
int32_t code = 0;
tsdbInfo("vgId:%d, start to commit", REPO_ID(pTsdb));
......@@ -298,7 +299,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitH *pCHandle) {
return code;
}
static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno) {
static int32_t tsdbEndCommit(SCommitter *pCHandle, int eno) {
int32_t code = 0;
STsdb *pTsdb = TSDB_COMMIT_REPO(pCHandle);
......@@ -312,7 +313,7 @@ static int32_t tsdbEndCommit(SCommitH *pCHandle, int eno) {
return code;
}
static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
static int tsdbInitCommitH(SCommitter *pCommith, STsdb *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pCommith, 0, sizeof(*pCommith));
......@@ -365,7 +366,7 @@ static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) {
}
// Skip all keys until key (not included)
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
static void tsdbSeekCommitIter(SCommitter *pCommith, TSKEY key) {
for (int i = 0; i < pCommith->niters; i++) {
SCommitIter *pIter = pCommith->iters + i;
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
......@@ -375,7 +376,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
}
}
static int tsdbNextCommitFid(SCommitH *pCommith) {
static int tsdbNextCommitFid(SCommitter *pCommith) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
int fid = TSDB_IVLD_FID;
......@@ -398,7 +399,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
return fid;
}
static void tsdbDestroyCommitH(SCommitH *pCommith) {
static void tsdbDestroyCommitH(SCommitter *pCommith) {
pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols);
pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk);
pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk);
......@@ -408,7 +409,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_t fid) {
static int32_t tsdbCommitToFileStart(SCommitter *pCHandle, SDFileSet *pSet, int32_t fid) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCHandle);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
......@@ -428,12 +429,12 @@ static int32_t tsdbCommitToFileStart(SCommitH *pCHandle, SDFileSet *pSet, int32_
return code;
}
static int32_t tsdbCommitToFileImpl(SCommitH *pCHandle) {
static int32_t tsdbCommitToFileImpl(SCommitter *pCHandle) {
int32_t code = 0;
// TODO
return code;
}
static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) {
static int32_t tsdbCommitToFileEnd(SCommitter *pCommith) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
......@@ -464,7 +465,7 @@ static int32_t tsdbCommitToFileEnd(SCommitH *pCommith) {
return code;
}
static int32_t tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
static int32_t tsdbCommitToFile(SCommitter *pCommith, SDFileSet *pSet, int fid) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbKeepCfg *pCfg = REPO_KEEP_CFG(pRepo);
......@@ -531,7 +532,7 @@ _err:
return code;
}
static int32_t tsdbCreateCommitIters(SCommitH *pCommith) {
static int32_t tsdbCreateCommitIters(SCommitter *pCommith) {
int32_t code = 0;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SMemTable *pMem = pRepo->imem;
......@@ -568,7 +569,7 @@ _err:
return code;
}
static void tsdbDestroyCommitIters(SCommitH *pCommith) {
static void tsdbDestroyCommitIters(SCommitter *pCommith) {
if (pCommith->iters == NULL) return;
for (int i = 1; i < pCommith->niters; i++) {
......@@ -585,7 +586,7 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
pCommith->niters = 0;
}
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
static int tsdbSetAndOpenCommitFile(SCommitter *pCommith, SDFileSet *pSet, int fid) {
SDiskID did;
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
......@@ -884,7 +885,7 @@ static int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
}
// =================== Commit Time-Series Data
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
static int tsdbCommitToTable(SCommitter *pCommith, int tid) {
SCommitIter *pIter = pCommith->iters + tid;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
......@@ -973,7 +974,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
return 0;
}
static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
static int tsdbMoveBlkIdx(SCommitter *pCommith, SBlockIdx *pIdx) {
SReadH *pReadh = &pCommith->readh;
STsdb *pTsdb = TSDB_READ_REPO(pReadh);
STSchema *pTSchema = NULL;
......@@ -1027,7 +1028,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return 0;
}
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
static int tsdbSetCommitTable(SCommitter *pCommith, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_COMMIT_REPO(pCommith), pTable, false, false, -1);
pCommith->pTable = pTable;
......@@ -1295,7 +1296,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
return 0;
}
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
static int tsdbWriteBlock(SCommitter *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
......@@ -1303,7 +1304,7 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
static int tsdbWriteBlockInfo(SCommitter *pCommih) {
SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
STable *pTable = TSDB_COMMIT_TABLE(pCommih);
......@@ -1325,7 +1326,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
return 0;
}
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
static int tsdbCommitMemData(SCommitter *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
SMergeInfo mInfo;
......@@ -1358,7 +1359,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
return 0;
}
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
static int tsdbMergeMemData(SCommitter *pCommith, SCommitIter *pIter, int bidx) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
......@@ -1430,7 +1431,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
return 0;
}
static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
static bool tsdbCommitIsSameFile(SCommitter *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
if (pBlock->last) {
return pCommith->isLFileSame;
......@@ -1438,7 +1439,7 @@ static bool tsdbCommitIsSameFile(SCommitH *pCommith, int bidx) {
return pCommith->isDFileSame;
}
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
static int tsdbMoveBlock(SCommitter *pCommith, int bidx) {
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
SDFile *pDFile;
SBlock block;
......@@ -1477,7 +1478,7 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
return 0;
}
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
static int tsdbCommitAddBlock(SCommitter *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
......@@ -1491,7 +1492,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
return 0;
}
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
static int tsdbMergeBlockData(SCommitter *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
......@@ -1628,13 +1629,13 @@ static void tsdbLoadAndMergeFromCache(STsdb *pTsdb, SDataCols *pDataCols, int *i
}
}
static void tsdbResetCommitTable(SCommitH *pCommith) {
static void tsdbResetCommitTable(SCommitter *pCommith) {
taosArrayClear(pCommith->aSubBlk);
taosArrayClear(pCommith->aSupBlk);
pCommith->pTable = NULL;
}
static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
static void tsdbCloseCommitFile(SCommitter *pCommith, bool hasError) {
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
}
......@@ -1645,7 +1646,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
}
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
static bool tsdbCanAddSubBlock(SCommitter *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
STsdb *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg *pCfg = REPO_CFG(pRepo);
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册