diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 54c1319c95e38a797eb2ed15bd5b77a035a4307f..433a12c16363ce5dda824515778bbf01d0328794 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -22,6 +22,7 @@ #define TSDB_DATA_FILE_CHANGE 0 #define TSDB_META_FILE_CHANGE 1 +#define TSDB_COMMIT_OVER 2 #define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5) @@ -147,11 +148,9 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { } static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { - STsdbRepo * pRepo = pCommitH->pRepo; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &(pRepo->config); - STsdbMeta * pMeta = pRepo->tsdbMeta; - STsdbFileH *pFileH = pRepo->tsdbFileH; + STsdbRepo *pRepo = pCommitH->pRepo; + SMemTable *pMem = pRepo->imem; + STsdbCfg * pCfg = &(pRepo->config); int mfid = tsdbGetCurrMinFid(pCfg->precision, pCfg->keep, pCfg->daysPerFile); if (tsdbLogRetentionChange(pCommitH, mfid) < 0) return -1; @@ -159,8 +158,8 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { if (pMem->numOfRows <= 0) return 0; // Initialize resources - STSCommitHandle tsCommitH = {0}; - if (tsdbInitTSCommitHandle(&tsCommitH, pRepo) < 0) return -1; + STSCommitHandle *pTSCh = tsdbNewTSCommitHandle(pRepo); + if (pTSCh == NULL) return -1; // Commit Time-Series data file by file int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); @@ -179,17 +178,17 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; if (tsdbLogTSFileChange(pCommitH, fid) < 0) { - tsdbDestroyTSCommitHandle(&tsCommitH); + tsdbFreeTSCommitHandle(pTSCh); return -1; } if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) { - tsdbDestroyTSCommitHandle(&tsCommitH); + tsdbFreeTSCommitHandle(pTSCh); return -1; } } - tsdbDestroyTSCommitHandle(&tsCommitH); + tsdbFreeTSCommitHandle(pTSCh); return 0; } @@ -338,24 +337,30 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK return 0; } -static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { +static STSCommitHandle *tsdbNewTSCommitHandle(STsdbRepo *pRepo) { STsdbCfg * pCfg = &(pRepo->config); STsdbMeta *pMeta = pRepo->tsdbMeta; SMemTable *pMem = pRepo->imem; + STSCommitHandle *pTSCh = (STSCommitHandle *)calloc(1, sizeof(*pTSCh)); + if (pTSCh == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return NULL; + } + + pTSCh->maxIters = pMem->maxTables; pTSCh->pIters = tsdbCreateCommitIters(pRepo); if (pTSCh->pIters == NULL) { tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - tsdbDestroyTSCommitHandle(pTSCh); - return -1; + tsdbFreeTSCommitHandle(pTSCh); + return NULL; } - pTSCh->maxIters = pMem->maxTables; pTSCh->pReadH = tsdbNewReadHandle(pRepo); if (pTSCh->pReadH == NULL) { tsdbError("vgId:%d failed to create new read handle since %s", REPO_ID(pRepo), tstrerror(terrno)); - tsdbDestroyTSCommitHandle(pTSCh); - return -1; + tsdbFreeTSCommitHandle(pTSCh); + return NULL; } pTSCh->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock); @@ -363,18 +368,22 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - tsdbDestroyTSCommitHandle(pTSCh); - return -1; + tsdbFreeTSCommitHandle(pTSCh); + return NULL; } - return 0; + return pTSCh; } -static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) { +static void tsdbFreeTSCommitHandle(STSCommitHandle *pTSCh) { if (pTSCh) { tdFreeDataCols(pTSCh->pDataCols); tsdbFreeReadHandle(pTSCh->pReadH); tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters); + taosTZfree(pTSCh->pSubBlock); + taosTZfree(pTSCh->pBlockInfo); + taosTZfree(pTSCh->pBlockIdx); + free(pTSCh); } }