提交 d0f1e625 编写于 作者: H Hongze Cheng

refactor more code

上级 86718347
...@@ -595,7 +595,19 @@ typedef struct { ...@@ -595,7 +595,19 @@ typedef struct {
SList* pModLog; SList* pModLog;
} SCommitHandle; } SCommitHandle;
void tsdbResetFGroupFd(SFileGroup* pFGroup); void tsdbResetFGroupFd(SFileGroup* pFGroup);
SReadHandle* tsdbNewReadHandle(STsdbRepo* pRepo);
void tsdbFreeReadHandle(SReadHandle* pReadH);
int tsdbSetAndOpenReadFGroup(SReadHandle* pReadH, SFileGroup* pFGroup);
void tsdbCloseAndUnsetReadFile(SReadHandle* pReadH);
int tsdbLoadBlockIdx(SReadHandle* pReadH);
int tsdbSetReadTable(SReadHandle* pReadH, STable* pTable);
int tsdbLoadBlockInfo(SReadHandle* pReadH);
int tsdbLoadBlockData(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo);
int tsdbLoadBlockDataCols(SReadHandle* pReadH, SBlock* pBlock, SBlockInfo* pBlockInfo, int16_t* colIds, int numOfCols);
int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
#define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)]))
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -26,8 +26,8 @@ ...@@ -26,8 +26,8 @@
typedef struct { typedef struct {
int maxIters; int maxIters;
SCommitIter *pIters; SCommitIter *pIters;
SFileGroup * pFGroup;
SReadHandle *pReadH; SReadHandle *pReadH;
SFileGroup * pFGroup;
SBlockIdx * pBlockIdx; SBlockIdx * pBlockIdx;
SBlockInfo * pBlockInfo; SBlockInfo * pBlockInfo;
SDataCols * pDataCols; SDataCols * pDataCols;
...@@ -177,8 +177,7 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { ...@@ -177,8 +177,7 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) {
return -1; return -1;
} }
if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) { if (tsdbCommitToFileGroup(pRepo, NULL, NULL, &tsCommitH) < 0) {
tsdbError("vgId:%d error occurs while committing to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
tsdbDestroyTSCommitHandle(&tsCommitH); tsdbDestroyTSCommitHandle(&tsCommitH);
return -1; return -1;
} }
...@@ -292,14 +291,13 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { ...@@ -292,14 +291,13 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
free(iters); free(iters);
} }
static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommitHandle *pTSCh) { static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileGroup *pNewGroup, STSCommitHandle *pTSCh) {
SRWHelper * pWHelper = &(pTSCh->whelper);
SCommitIter *iters = pTSCh->pIters; SCommitIter *iters = pTSCh->pIters;
if (tsdbHelperOpenFile(pWHelper) < 0) return -1; if (tsdbSetAndOpenCommitFGroup(pTSCh, pOldGroup, pNewGroup) < 0) return -1;
if (tsdbLoadCompIdx(pWHelper, NULL) < 0) { if (tsdbLoadBlockIdx(pTSCh->pReadH) < 0) {
tsdbHelperCloseFile(pWHelper, true /* hasError = false */); tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */);
return -1; return -1;
} }
...@@ -329,104 +327,6 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommi ...@@ -329,104 +327,6 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommi
return 0; return 0;
} }
static int tsdbCommitToFile(STsdbRepo *pRepo, SFileGroup *pOldFGroup, SFileGroup *pNewFGroup, STSCommitHandle *pTSCh) {
char * dataDir = NULL;
STsdbCfg * pCfg = &pRepo->config;
STsdbFileH * pFileH = pRepo->tsdbFileH;
SFileGroup * pGroup = NULL;
SMemTable * pMem = pRepo->imem;
bool newLast = false;
SCommitIter *iters = pTSCh->pIters;
SRWHelper * pHelper = &(pTSCh->whelper);
SDataCols * pDataCols = pTSCh->pDataCols;
int fid = pOldFGroup->fileId;
ASSERT(pOldFGroup->fileId == pNewFGroup->fileId);
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
newLast = TSDB_NLAST_FILE_OPENED(pHelper);
if (tsdbLoadCompIdx(pHelper, NULL) < 0) {
tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Loop to commit data in each table
for (int tid = 1; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = iters + tid;
if (pIter->pTable == NULL) continue;
taosRLockLatch(&(pIter->pTable->latch));
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
if (pIter->pIter != NULL) {
if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch));
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
tstrerror(terrno));
goto _err;
}
}
taosRUnLockLatch(&(pIter->pTable->latch));
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
// Write the SCompBlock part
if (tsdbWriteCompInfo(pHelper) < 0) {
tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
}
if (tsdbWriteCompIdx(pHelper) < 0) {
tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _err;
}
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0, pGroup);
// pthread_rwlock_wrlock(&(pFileH->fhlock));
// (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
// if (newLast) {
// (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname);
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info;
// } else {
// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info;
// }
// pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info;
// pthread_rwlock_unlock(&(pFileH->fhlock));
return 0;
_err:
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) { for (int i = 0; i < nIters; i++) {
SCommitIter *pIter = iters + i; SCommitIter *pIter = iters + i;
...@@ -451,8 +351,9 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { ...@@ -451,8 +351,9 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) {
} }
pTSCh->maxIters = pMem->maxTables; pTSCh->maxIters = pMem->maxTables;
if (tsdbInitWriteHelper(&(pTSCh->whelper), pRepo) < 0) { pTSCh->pReadH = tsdbNewReadHandle(pRepo);
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); if (pTSCh->pReadH == NULL) {
tsdbError("vgId:%d failed to create new read handle since %s", REPO_ID(pRepo), tstrerror(terrno));
tsdbDestroyTSCommitHandle(pTSCh); tsdbDestroyTSCommitHandle(pTSCh);
return -1; return -1;
} }
...@@ -472,7 +373,7 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) { ...@@ -472,7 +373,7 @@ static int tsdbInitTSCommitHandle(STSCommitHandle *pTSCh, STsdbRepo *pRepo) {
static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) { static void tsdbDestroyTSCommitHandle(STSCommitHandle *pTSCh) {
if (pTSCh) { if (pTSCh) {
tdFreeDataCols(pTSCh->pDataCols); tdFreeDataCols(pTSCh->pDataCols);
tsdbDestroyHelper(&(pTSCh->whelper)); tsdbFreeReadHandle(pTSCh->pReadH);
tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters); tsdbDestroyCommitIters(pTSCh->pIters, pTSCh->maxIters);
} }
} }
...@@ -1168,4 +1069,61 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI ...@@ -1168,4 +1069,61 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkI
blkIdx); blkIdx);
return 0; return 0;
} }
\ No newline at end of file
static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGroup, STsdbRepo *pNewGroup) {
STsdbRepo *pRepo = pTSCh->pReadH->pRepo;
if (tsdbSetAndOpenReadFGroup(pTSCh->pReadH, pOldGroup) < 0) {
tsdbError("vgId:%d failed to set and open commit file group since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1;
}
tsdbResetFGroupFd(pNewGroup);
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type);
SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type);
pNewFile->fd = open(pNewFile->fname, O_CREAT | O_WRONLY, 0755);
if (pNewFile->fd < 0) {
tsdbError("vgId:%d failed to open file %s while commit since %s", REPO_ID(pRepo), pNewFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/);
return -1;
}
if (pOldFile->fname[0] == '\0' ||
strncmp(pOldFile->fname, pNewFile->fname, TSDB_FILENAME_LEN) != 0) { // new file is created
if (tsdbUpdateFileHeader(pNewFile) < 0) {
tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/);
return -1;
}
}
}
pTSCh->pFGroup = pNewGroup;
return 0;
_err:
}
static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) {
tsdbCloseAndUnsetReadFile(pTSCh->pReadH);
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pOldFile = TSDB_FILE_IN_FGROUP(pOldGroup, type);
SFile *pNewFile = TSDB_FILE_IN_FGROUP(pNewGroup, type);
if (pNewFile->fd >= 0) {
if (!hasError) {
(void)fsync(pNewFile->fd);
}
(void)close(pNewFile->fd);
pNewFile->fd = -1;
}
}
}
...@@ -60,7 +60,7 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) { ...@@ -60,7 +60,7 @@ void tsdbFreeReadHandle(SReadHandle *pReadH) {
} }
} }
int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
STsdbRepo *pRepo = pReadH->pRepo; STsdbRepo *pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
...@@ -76,7 +76,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -76,7 +76,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
if (pFile->fd < 0) { if (pFile->fd < 0) {
tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno)); tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseAndUnsetFile(pReadH); tsdbCloseAndUnsetReadFile(pReadH);
return -1; return -1;
} }
} }
...@@ -87,7 +87,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ...@@ -87,7 +87,7 @@ int tsdbSetAndOpenFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) {
return 0; return 0;
} }
void tsdbCloseAndUnsetFile(SReadHandle *pReadH) { void tsdbCloseAndUnsetReadFile(SReadHandle *pReadH) {
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
SFile *pFile = TSDB_READ_FILE(pReadH, type); SFile *pFile = TSDB_READ_FILE(pReadH, type);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册