提交 16a8556b 编写于 作者: H Hongze Cheng

refactor

上级 1adfaef8
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
#define TSDB_DATA_FILE_CHANGE 0 #define TSDB_DATA_FILE_CHANGE 0
#define TSDB_META_FILE_CHANGE 1 #define TSDB_META_FILE_CHANGE 1
#define TSDB_COMMIT_OVER 2
#define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5) #define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5)
...@@ -147,11 +148,9 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { ...@@ -147,11 +148,9 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) {
} }
static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
STsdbRepo * pRepo = pCommitH->pRepo; STsdbRepo *pRepo = pCommitH->pRepo;
SMemTable * pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile);
if (tsdbLogRetentionChange(pCommitH, mfid) < 0) return -1; if (tsdbLogRetentionChange(pCommitH, mfid) < 0) return -1;
...@@ -159,8 +158,8 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { ...@@ -159,8 +158,8 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
if (pMem->numOfRows <= 0) return 0; if (pMem->numOfRows <= 0) return 0;
// Initialize resources // Initialize resources
STSCommitHandle tsCommitH = {0}; STSCommitHandle *pTSCh = tsdbNewTSCommitHandle(pRepo);
if (tsdbInitTSCommitHandle(&tsCommitH, pRepo) < 0) return -1; if (pTSCh == NULL) return -1;
// Commit Time-Series data file by file // Commit Time-Series data file by file
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
...@@ -179,17 +178,17 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { ...@@ -179,17 +178,17 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue;
if (tsdbLogTSFileChange(pCommitH, fid) < 0) { if (tsdbLogTSFileChange(pCommitH, fid) < 0) {
tsdbDestroyTSCommitHandle(&tsCommitH); tsdbFreeTSCommitHandle(pTSCh);
return -1; return -1;
} }
if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) { if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) {
tsdbDestroyTSCommitHandle(&tsCommitH); tsdbFreeTSCommitHandle(pTSCh);
return -1; return -1;
} }
} }
tsdbDestroyTSCommitHandle(&tsCommitH); tsdbFreeTSCommitHandle(pTSCh);
return 0; return 0;
} }
...@@ -338,24 +337,30 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK ...@@ -338,24 +337,30 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
return 0; return 0;
} }
static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { static STSCommitHandle *tsdbNewTSCommitHandle(STsdbRepo *pRepo) {
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
STsdbMeta *pMeta = pRepo->tsdbMeta; STsdbMeta *pMeta = pRepo->tsdbMeta;
SMemTable *pMem = pRepo->imem; SMemTable *pMem = pRepo->imem;
STSCommitHandle *pTSCh = (STSCommitHandle *)calloc(1, sizeof(*pTSCh));
if (pTSCh == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
}
pTSCh->maxIters = pMem->maxTables;
pTSCh->pIters = tsdbCreateCommitIters(pRepo); pTSCh->pIters = tsdbCreateCommitIters(pRepo);
if (pTSCh->pIters == NULL) { if (pTSCh->pIters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh); tsdbFreeTSCommitHandle(pTSCh);
return -1; return NULL;
} }
pTSCh->maxIters = pMem->maxTables;
pTSCh->pReadH = tsdbNewReadHandle(pRepo); pTSCh->pReadH = tsdbNewReadHandle(pRepo);
if (pTSCh->pReadH == NULL) { if (pTSCh->pReadH == NULL) {
tsdbError("vgId:%d failed to create new read handle since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to create new read handle since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh); tsdbFreeTSCommitHandle(pTSCh);
return -1; return NULL;
} }
pTSCh->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); pTSCh->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
...@@ -363,18 +368,22 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { ...@@ -363,18 +368,22 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh); tsdbFreeTSCommitHandle(pTSCh);
return -1; return NULL;
} }
return 0; return pTSCh;
} }
static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) { static void tsdbFreeTSCommitHandle(STSCommitHandle *pTSCh) {
if (pTSCh) { if (pTSCh) {
tdFreeDataCols(pTSCh->pDataCols); tdFreeDataCols(pTSCh->pDataCols);
tsdbFreeReadHandle(pTSCh->pReadH); tsdbFreeReadHandle(pTSCh->pReadH);
tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters); tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters);
taosTZfree(pTSCh->pSubBlock);
taosTZfree(pTSCh->pBlockInfo);
taosTZfree(pTSCh->pBlockIdx);
free(pTSCh);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册