From d0f1e62563b0bc114a102da450bcd45a5caf1c30 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 14 Oct 2020 22:29:21 +0800 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 14 ++- src/tsdb/src/tsdbCommit.c | 178 ++++++++++++++---------------------- src/tsdb/src/tsdbReadUtil.c | 6 +- 3 files changed, 84 insertions(+), 114 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 50cf80857e..d0b6388227 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -595,7 +595,19 @@ typedef struct { SList* pModLog; } SCommitHandle; -void tsdbResetFGroupFd(SFileGroup* pFGroup); +void tsdbResetFGroupFd(SFileGroup* pFGroup); +SReadHandle* tsdbNewReadHandle(STsdbRepo* pRepo); +void tsdbFreeReadHandle(SReadHandle* pReadH); +int tsdbSetAndOpenReadFGroup(SReadHandle* pReadH, SFileGroup* pFGroup); +void tsdbCloseAndUnsetReadFile(SReadHandle* pReadH); +int tsdbLoadBlockIdx(SReadHandle* pReadH); +int tsdbSetReadTable(SReadHandle* pReadH, STable* pTable); +int tsdbLoadBlockInfo(SReadHandle* pReadH); +int tsdbLoadBlockData(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo); +int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo, int16_t* colIds, int numOfCols); +int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); + +#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 4e32d50096..c6fb64f004 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -26,8 +26,8 @@ typedef struct { int maxIters; SCommitIter *pIters; - SFileGroup * pFGroup; SReadHandle *pReadH; + SFileGroup * pFGroup; SBlockIdx * pBlockIdx; SBlockInfo * pBlockInfo; SDataCols * pDataCols; @@ -177,8 +177,7 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { return -1; } - if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) { - tsdbError("vgId:%d error occurs while committing to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) { tsdbDestroyTSCommitHandle(&tsCommitH); return -1; } @@ -292,14 +291,13 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { free(iters); } -static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommitHandle *pTSCh) { - SRWHelper * pWHelper = &(pTSCh->whelper); +static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup, STSCommitHandle *pTSCh) { SCommitIter *iters = pTSCh->pIters; - if (tsdbHelperOpenFile(pWHelper) < 0) return -1; + if (tsdbSetAndOpenCommitFGroup(pTSCh, pOldGroup, pNewGroup) < 0) return -1; - if (tsdbLoadCompIdx(pWHelper, NULL) < 0) { - tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + if (tsdbLoadBlockIdx(pTSCh->pReadH) < 0) { + tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); return -1; } @@ -329,104 +327,6 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommi return 0; } -static int tsdbCommitToFile(STsdbRepo *pRepo, SFileGroup *pOldFGroup, SFileGroup *pNewFGroup, STSCommitHandle *pTSCh) { - char * dataDir = NULL; - STsdbCfg * pCfg = &pRepo->config; - STsdbFileH * pFileH = pRepo->tsdbFileH; - SFileGroup * pGroup = NULL; - SMemTable * pMem = pRepo->imem; - bool newLast = false; - SCommitIter *iters = pTSCh->pIters; - SRWHelper * pHelper = &(pTSCh->whelper); - SDataCols * pDataCols = pTSCh->pDataCols; - int fid = pOldFGroup->fileId; - - ASSERT(pOldFGroup->fileId == pNewFGroup->fileId); - - // Open files for write/read - if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - newLast = TSDB_NLAST_FILE_OPENED(pHelper); - - if (tsdbLoadCompIdx(pHelper, NULL) < 0) { - tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Loop to commit data in each table - for (int tid = 1; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = iters + tid; - if (pIter->pTable == NULL) continue; - - taosRLockLatch(&(pIter->pTable->latch)); - - if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; - - if (pIter->pIter != NULL) { - if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { - taosRUnLockLatch(&(pIter->pTable->latch)); - tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), - tstrerror(terrno)); - goto _err; - } - } - - taosRUnLockLatch(&(pIter->pTable->latch)); - - // Move the last block to the new .l file if neccessary - if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Write the SCompBlock part - if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - } - - if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } - - taosTFree(dataDir); - tsdbCloseHelperFile(pHelper, 0, pGroup); - - // pthread_rwlock_wrlock(&(pFileH->fhlock)); - - // (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); - // pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; - - // if (newLast) { - // (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); - // pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; - // } else { - // pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; - // } - - // pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; - - // pthread_rwlock_unlock(&(pFileH->fhlock)); - - return 0; - -_err: - taosTFree(dataDir); - tsdbCloseHelperFile(pHelper, 1, NULL); - return -1; -} - static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { for (int i = 0; i < nIters; i++) { SCommitIter *pIter = iters + i; @@ -451,8 +351,9 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { } pTSCh->maxIters = pMem->maxTables; - if (tsdbInitWriteHelper(&(pTSCh->whelper), pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + pTSCh->pReadH = tsdbNewReadHandle(pRepo); + if (pTSCh->pReadH == NULL) { + tsdbError("vgId:%d failed to create new read handle since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbDestroyTSCommitHandle(pTSCh); return -1; } @@ -472,7 +373,7 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) { if (pTSCh) { tdFreeDataCols(pTSCh->pDataCols); - tsdbDestroyHelper(&(pTSCh->whelper)); + tsdbFreeReadHandle(pTSCh->pReadH); tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters); } } @@ -1168,4 +1069,61 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI blkIdx); return 0; -} \ No newline at end of file +} + +static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGroup, STsdbRepo *pNewGroup) { + STsdbRepo *pRepo = pTSCh->pReadH->pRepo; + + if (tsdbSetAndOpenReadFGroup(pTSCh->pReadH, pOldGroup) < 0) { + tsdbError("vgId:%d failed to set and open commit file group since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + tsdbResetFGroupFd(pNewGroup); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type); + SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type); + + pNewFile->fd = open(pNewFile->fname, O_CREAT | O_WRONLY, 0755); + if (pNewFile->fd < 0) { + tsdbError("vgId:%d failed to open file %s while commit since %s", REPO_ID(pRepo), pNewFile->fname, + strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/); + return -1; + } + + if (pOldFile->fname[0] == '\0' || + strncmp(pOldFile->fname, pNewFile->fname, TSDB_FILENAME_LEN) != 0) { // new file is created + if (tsdbUpdateFileHeader(pNewFile) < 0) { + tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/); + return -1; + } + } + } + + pTSCh->pFGroup = pNewGroup; + return 0; + +_err: +} + +static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) { + tsdbCloseAndUnsetReadFile(pTSCh->pReadH); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type); + SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type); + + if (pNewFile->fd >= 0) { + if (!hasError) { + (void)fsync(pNewFile->fd); + } + (void)close(pNewFile->fd); + pNewFile->fd = -1; + } + } +} diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 5e58ad48ac..a9da2e1374 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -60,7 +60,7 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) { } } -int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { +int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { STsdbRepo *pRepo = pReadH->pRepo; STsdbCfg * pCfg = &(pRepo->config); @@ -76,7 +76,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { if (pFile->fd < 0) { tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - tsdbCloseAndUnsetFile(pReadH); + tsdbCloseAndUnsetReadFile(pReadH); return -1; } } @@ -87,7 +87,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { return 0; } -void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { +void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { SFile *pFile = TSDB_READ_FILE(pReadH, type); -- GitLab