From 175811008723222f9d19dde242a8f40486b5a89e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 10 Jan 2021 19:41:47 +0800 Subject: [PATCH] more work --- src/inc/taoserror.h | 1 + src/inc/tfs.h | 1 + src/tsdb/inc/tsdbFile.h | 7 +- src/tsdb/inc/tsdbMain.h | 1 - src/tsdb/inc/tsdbMemTable.h | 1 + src/tsdb/inc/tsdbint.h | 2 + src/tsdb/src/tsdbCommit.c | 581 +++++++++++++++++++++--------------- src/tsdb/src/tsdbFS.c | 5 +- src/tsdb/src/tsdbFile.c | 30 +- 9 files changed, 387 insertions(+), 242 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index ed88bc15ee..2127b74d21 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -241,6 +241,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "No table d TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File already exists") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table") +TAOS_DEFINE_ERROR(TSDB_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle") diff --git a/src/inc/tfs.h b/src/inc/tfs.h index 8b58374b94..10ee3d7c55 100644 --- a/src/inc/tfs.h +++ b/src/inc/tfs.h @@ -56,6 +56,7 @@ typedef struct { #define TFILE_LEVEL(pf) ((pf)->level) #define TFILE_ID(pf) ((pf)->id) #define TFILE_NAME(pf) ((pf)->aname) +#define TFILE_REL_NAME(pf) ((pf)->rname) void tfsInitFile(TFILE *pf, int level, int id, const char *bname); bool tfsIsSameFile(TFILE *pf1, TFILE *pf2); diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index aa6a5629fc..09f63e97d6 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -30,6 +30,8 @@ extern "C" { #define TSDB_FILE_FULL_NAME(f) TFILE_NAME(TSDB_FILE_F(f)) #define TSDB_FILE_OPENED(f) (TSDB_FILE_FD(f) >= 0) #define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_FD(f) = -1) +#define TSDB_FILE_LEVEL(tf) TFILE_LEVEL(TSDB_FILE_F(tf)) +#define TSDB_FILE_ID(tf) TFILE_ID(TSDB_FILE_F(tf)) typedef enum { TSDB_FILE_HEAD = 0, @@ -214,13 +216,16 @@ typedef struct { #define TSDB_FSET_FID(s) ((s)->fid) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) +#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) +#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id); void tsdbInitDFileSetWithOld(SDFileSet* pSet, SDFileSet* pOldSet); int tsdbOpenDFileSet(SDFileSet* pSet, int flags); void tsdbCloseDFileSet(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet); +int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet* pDest); +int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet* pDest); #ifdef __cplusplus } diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 73efbbad92..c8ac977884 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -174,7 +174,6 @@ void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); // int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); // void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); // void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); -void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ================= tsdbMain.c diff --git a/src/tsdb/inc/tsdbMemTable.h b/src/tsdb/inc/tsdbMemTable.h index 3d341bb798..82cb579514 100644 --- a/src/tsdb/inc/tsdbMemTable.h +++ b/src/tsdb/inc/tsdbMemTable.h @@ -85,6 +85,7 @@ int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); void* tsdbCommitData(STsdbRepo* pRepo); +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY* minKey, TSKEY* maxKey); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 23851b3521..48cb09cf80 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -19,6 +19,7 @@ // TODO: remove the include #include #include +#include #include #include #include @@ -84,6 +85,7 @@ struct STsdbRepo { #define REPO_ID(r) (r)->config.tsdbId #define REPO_CFG(r) (&((r)->config)) +#define REPO_FS_VERSION(r) // TODO #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index d9743542ff..26fd55efa3 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -12,10 +12,11 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "tsdbMain.h" +#include "tsdbint.h" #define TSDB_IVLD_FID INT_MIN #define TSDB_MAX_SUBBLOCKS 8 +#define TSDB_KEY_FID(key, days, precision) ((key) / tsMsPerDay[(precision)] / (days)) typedef struct { int minFid; @@ -25,28 +26,31 @@ typedef struct { } SRtn; typedef struct { + int version; SRtn rtn; // retention snapshot - int niters; - SCommitIter *iters; // memory iterators + bool isRFileSet; SReadH readh; - SDFileSet * pWSet; + SFSIter fsIter; // tsdb file iterator + int niters; // memory iterators + SCommitIter *iters; + SDFileSet wSet; // commit file TSKEY minKey; TSKEY maxKey; - SArray * aBlkIdx; - SArray * aSupBlk; - SArray * aSubBlk; + SArray * aBlkIdx; // SBlockIdx array + SArray * aSupBlk; // Table super-block array + SArray * aSubBlk; // table sub-block array SDataCols * pDataCols; } SCommitH; #define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) #define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) -#define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet) +#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) #define TSDB_COMMIT_TABLE(ch) TSDB_READ_TABLE(&(ch->readh)) #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) -#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh)) -#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh)) +#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) +#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5) void *tsdbCommitData(STsdbRepo *pRepo) { @@ -78,73 +82,7 @@ _err: return NULL; } -static int tsdbCommitTSData(STsdbRepo *pRepo) { - SMemTable *pMem = pRepo->imem; - STsdbCfg * pCfg = &(pRepo->config); - SCommitH ch = {0}; - SFSIter fsIter = {0}; - SDFileSet *pOldSet = NULL; - SDFileSet nSet; - int level, id; - int fid; - - if (pMem->numOfRows <= 0) return 0; - - // Resource initialization - if (tsdbInitCommitH(pRepo, &ch) < 0) { - // TODO - return -1; - } - tsdbInitFSIter(pRepo, &fsIter); - - // Skip expired memory data and expired FSET - tsdbSeekCommitIter(ch.iters, pMem->maxTables, ch.rtn.minKey); - fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); - while (true) { - pOldSet = tsdbFSIterNext(&fsIter); - if (pOldSet == NULL || pOldSet->fid >= ch.rtn.minFid) break; - } - - // Loop to commit to each file - while (true) { - // Loop over both on disk and memory - if (pOldSet == NULL && fid == TSDB_IVLD_FID) break; - - // Only has existing FSET but no memory data to commit in this - // existing FSET, only check if file in correct retention - if (pOldSet && (fid == TSDB_IVLD_FID || pOldSet->fid < fid)) { - if (tsdbApplyRtn(*pOldSet, &(ch.rtn), &nSet) < 0) { - return -1; - } - - tsdbUpdateDFileSet(pRepo, &nSet); - - pOldSet = tsdbFSIterNext(&fsIter); - continue; - } - - SDFileSet *pCSet; - int cfid; - - if (pOldSet == NULL || pOldSet->fid > fid) { - // Commit to a new FSET with fid: fid - pCSet = NULL; - cfid = fid; - } else { - // Commit to an existing FSET - pCSet = pOldSet; - cfid = pOldSet->fid; - pOldSet = tsdbFSIterNext(&fsIter); - } - fid = tsdbNextCommitFid(ch.iters, pMem->maxTables); - - tsdbCommitToFile(pCSet, &ch, cfid); - } - - tsdbDestroyCommitH(&ch, pMem->maxTables); - return 0; -} - +// =================== Commit Meta Data static int tsdbCommitMeta(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -196,6 +134,93 @@ _err: return -1; } +// =================== Commit Time-Series Data +static int tsdbCommitTSData(STsdbRepo *pRepo) { + SMemTable *pMem = pRepo->imem; + STsdbCfg * pCfg = REPO_CFG(pRepo); + SCommitH ch = {0}; + SDFileSet *pSet = NULL; + SDFileSet nSet; + int fid; + + if (pMem->numOfRows <= 0) return 0; + + // Resource initialization + if (tsdbInitCommitH(pRepo, &ch) < 0) { + return -1; + } + + // Skip expired memory data and expired FSET + tsdbSeekCommitIter(&ch, ch.rtn.minKey); + while (true) { + pSet = tsdbFSIterNext(&(ch.fsIter)); + if (pSet == NULL || pSet->fid >= ch.rtn.minFid) break; + } + + // Loop to commit to each file + fid = tsdbNextCommitFid(&(ch)); + while (true) { + // Loop over both on disk and memory + if (pSet == NULL && fid == TSDB_IVLD_FID) break; + + if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { + // Only has existing FSET but no memory data to commit in this + // existing FSET, only check if file in correct retention + int level, id; + + tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ch.rtn)), &level, &id); + if (level == TFS_UNDECIDED_LEVEL) { + terrno = TSDB_TDB_NO_AVAIL_DISK; + tsdbDestroyCommitH(&ch); + return -1; + } + + if (level > TSDB_FSET_LEVEL(pSet)) { + if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) { + tsdbDestroyCommitH(&ch); + return -1; + } + + if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) { + tsdbDestroyCommitH(&ch); + return -1; + } + } else { + if (tsdbUpdateDFileSet(pRepo, pSet) < 0) { + tsdbDestroyCommitH(&ch); + return -1; + } + } + + pSet = tsdbFSIterNext(&(ch.fsIter)); + } else { + // Has memory data to commit + SDFileSet *pCSet; + int cfid; + + if (pSet == NULL || pSet->fid > fid) { + // Commit to a new FSET with fid: fid + pCSet = NULL; + cfid = fid; + } else { + // Commit to an existing FSET + pCSet = pSet; + cfid = pSet->fid; + pSet = tsdbFSIterNext(&(ch.fsIter)); + } + fid = tsdbNextCommitFid(&ch); + + if (tsdbCommitToFile(pCSet, &ch, cfid) < 0) { + tsdbDestroyCommitH(&ch); + return -1; + } + } + } + + tsdbDestroyCommitH(&ch); + return 0; +} + static int tsdbStartCommit(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; @@ -234,163 +259,210 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS return false; } -static int tsdbCommitToFile(SCommitH *pch, SDFileSet *pOldSet, int fid) { +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { int level, id; - int nSet, ver; - STsdbRepo *pRepo; + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + + ASSERT(pSet == NULL || pSet->fid == fid); - ASSERT(pOldSet == NULL || pOldSet->fid == fid); + tsdbResetCommitFile(pCommith); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); - tfsAllocDisk(tsdbGetFidLevel(fid, &(pch->rtn)), &level, &id); + tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id); if (level == TFS_UNDECIDED_LEVEL) { - // TODO + terrno = TSDB_TDB_NO_AVAIL_DISK; return -1; } - if (pOldSet == NULL || level > TSDB_FSET_LEVEL(pOldSet)) { - // Create new fset to commit - tsdbInitDFileSet(&nSet, pRepo, fid, ver, level, id); - if (tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT) < 0) { - // TODO: - return -1; - } - - if (tsdbUpdateDFileSetHeader(&nSet) < 0) { - // TODO - return -1; - } + // Set commit file + if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) { + tsdbInitDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), REPO_ID(pRepo), fid, pCommith->version, level, id); } else { - level = TSDB_FSET_LEVEL(pOldSet); - - tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_HEAD), ...); - - tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_DATA), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_DATA)) - - SDFile *pDFile = TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST); - if (pDFile->info.size < 32 * 1024 * 1024) { - tsdbInitDFileWithOld(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), TSDB_DFILE_IN_SET(pOldSet, TSDB_FILE_LAST)) + level = TSDB_FSET_LEVEL(pSet); + id = TSDB_FSET_ID(pSet); + + // TSDB_FILE_HEAD + SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); + tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD); + + // TSDB_FILE_DATA + SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); + SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); + tsdbInitDFileWithOld(pWDataf, pRDataf); + + // TSDB_FILE_LAST + SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); + SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); + if (pRLastf->info.size < 32 * 1024) { + tsdbInitDFileWithOld(pWLastf, pRLastf); } else { - tsdbInitDFile(TSDB_DFILE_IN_SET(&nSet, TSDB_FILE_LAST), ...); + tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST); } - - tsdbOpenDFileSet(&nSet, O_WRONLY | O_CREAT); - - // TODO: update file header } - tsdbSetCommitFile(pch, pOldSet, &nSet); + // Open commit file + if (tsdbOpenCommitFile(pCommith, pSet) < 0) { + return -1; + } - for (size_t tid = 0; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = pch->iters + tid; + // Loop to commit each table data + for (int tid = 0; tid < pCommith->niters; tid++) { + SCommitIter *pIter = pCommith->iters + tid; - // No table exists, continue if (pIter->pTable == NULL) continue; - if (tsdbCommitToTable(pch, tid) < 0) { - // TODO + if (tsdbCommitToTable(pCommith, tid) < 0) { + // TODO: revert the file change + tsdbCloseCommitFile(pCommith, true); return -1; } } - tsdbUpdateDFileSet(pRepo, &wSet); + tsdbCloseCommitFile(pCommith, false); + + if (tsdbUpdateDFileSet(pRepo, &(pCommith->wSet)) < 0) { + // TODO + return -1; + } + + return 0; } -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { +static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) { + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; - SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); - if (iters == NULL) { + pCommith->niters = pMem->maxTables; + pCommith->iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); + if (pCommith->iters == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; + return -1; } - if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; + if (tsdbRLockRepoMeta(pRepo) < 0) return -1 // reference all tables for (int i = 0; i < pMem->maxTables; i++) { if (pMeta->tables[i] != NULL) { tsdbRefTable(pMeta->tables[i]); - iters[i].pTable = pMeta->tables[i]; + pCommith->iters[i].pTable = pMeta->tables[i]; } } - if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; + if (tsdbUnlockRepoMeta(pRepo) < 0) return -1; for (int i = 0; i < pMem->maxTables; i++) { - if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { - if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { + if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) { + if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + return -1; } - tSkipListIterNext(iters[i].pIter); + tSkipListIterNext(pCommith->iters[i].pIter); } } - return iters; - -_err: - tsdbDestroyCommitIters(iters, pMem->maxTables); - return NULL; + return 0; } -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { - if (iters == NULL) return; +static void tsdbDestroyCommitIters(SCommitH *pCommith) { + if (pCommith->iters == NULL) return; - for (int i = 1; i < maxTables; i++) { - if (iters[i].pTable != NULL) { - tsdbUnRefTable(iters[i].pTable); + for (int i = 1; i < pCommith->niters; i++) { + if (pCommith->iters[i].pTable != NULL) { + tsdbUnRefTable(pCommith->iters[i].pTable); tSkipListDestroyIter(iters[i].pIter); } } - free(iters); + free(pCommith->iters); + pCommith->iters = NULL; } -static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) { - for (int i = 0; i < nIters; i++) { - SCommitIter *pIter = pIters + i; +// Skip all keys until key (not included) +static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { + for (int i = 0; i < pCommith->niters; i++) { + SCommitIter *pIter = pCommith->iters + i; if (pIter->pTable == NULL) continue; if (pIter->pIter == NULL) continue; - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL); + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); } } -static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch) { - STsdbMeta *pMeta = pRepo->tsdbMeta; - STsdbCfg * pCfg = &(pRepo->config); +static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + + memset(pCommith, 0, sizeof(*pCommith)); + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype)); + } - pch->iters = tsdbCreateCommitIters(pRepo); - if (pch->iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + pCommith->version = REPO_FS_VERSION(pRepo) + 1; + + tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); + + // Init read handle + if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { return -1; } - if (tsdbInitWriteHelper(&(pch->whelper), pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + // Init file iterator + if (tsdbInitFSIter(pRepo, &(pCommith->fsIter)) < 0) { + tsdbDestroyCommitH(pCommith); + return -1; + } + + if (tsdbCreateCommitIters(pCommith) < 0) { + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pCommith->aBlkIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCommith->aSupBlk == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); return -1; } - if ((pch->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCommith->aSubBlk == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock); + if (pCommith->pDataCols == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); return -1; } return 0; } -static void tsdbDestroyCommitH(SCommitH *pch, int niter) { - tdFreeDataCols(pch->pDataCols); - tsdbDestroyCommitIters(pch->iters, niter); - tsdbDestroyHelper(&(pch->whelper)); +static void tsdbDestroyCommitH(SCommitH *pCommith) { + pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols); + pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk); + pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk); + pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx); + tsdbDestroyCommitIters(pCommith); + tsdbDestroyReadH(&(pCommith->readh)); + tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { - STsdbCfg *pCfg = &(pRepo->config); + STsdbCfg *pCfg = REPO_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; now = taosGetTimestamp(pCfg->precision); @@ -399,9 +471,9 @@ static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; pRtn->minKey = minKey; - pRtn->minFid = TSDB_KEY_FILEID(minKey, pCfg->daysPerFile, pCfg->precision); - pRtn->midFid = TSDB_KEY_FILEID(midKey, pCfg->daysPerFile, pCfg->precision); - pRtn->maxFid = TSDB_KEY_FILEID(maxKey, pCfg->daysPerFile, pCfg->precision); + pRtn->minFid = TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision); + pRtn->midFid = TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision); + pRtn->maxFid = TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision); } static int tsdbGetFidLevel(int fid, SRtn *pRtn) { @@ -416,146 +488,154 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) { } } -static int tsdbNextCommitFid(SCommitIter *iters, int niters) { - int fid = TSDB_IVLD_FID; +static int tsdbNextCommitFid(SCommitH *pCommith) { + SCommitIter *pIter; + STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + int fid = TSDB_IVLD_FID; - // TODO + for (int i = 0; i < pCommith->niters; i++) { + pIter = pCommith->iters + i; + if (pIter->pTable == NULL || pIter->pIter == NULL) continue; + + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { + continue; + } else { + int tfid = TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision); + if (fid == TSDB_IVLD_FID || fid > tfid) { + fid = tfid; + } + } + } return fid; } -static int tsdbApplyRtn(const SDFileSet oSet, const SRtn *pRtn, SDFileSet *pRSet) { - int level, id; - int vid, ver; +static int tsdbCommitToTable(SCommitH *pCommith, int tid) { + SCommitIter *pIter = pCommith->iters + tid; + if (pIter->pTable == NULL) return 0; - tfsAllocDisk(tsdbGetFidLevel(oSet.fid, pRtn), &level, &id); + TSDB_RLOCK_TABLE(pIter->pTable); - if (level == TFS_UNDECIDED_LEVEL) { - // terrno = TSDB_CODE_TDB_NO_AVAILABLE_DISK; + // Set commit table + tsdbResetCommitTable(pCommith); + if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } - if (level > TSDB_FSET_LEVEL(pSet)) { - tsdbInitDFileSet(pRSet, vid, TSDB_FSET_FID(&oSet), ver, level, id); - if (tsdbCopyDFileSet(&oSet, pRSet) < 0) { - return -1; - } - } else { - tsdbInitDFileSetWithOld(pRSet, &oSet); - } - - return 0; -} - -static int tsdbCommitToTable(SCommitH *pch, int tid) { - SCommitIter *pIter = pch->iters + tid; - if (pIter->pTable == NULL) return 0; + if (!pCommith->isRFileSet) { + if (pIter->pIter == NULL) { + // No memory data + TSDB_RUNLOCK_TABLE(pIter->pTable); + return 0; + } else { + // TODO: think about no data committed at all + if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, true) < 0) { + TSDB_RUNLOCK_TABLE(pIter->pTable); + return -1; + } - TSDB_RLOCK_TABLE(pIter->pTable); + TSDB_RUNLOCK_TABLE(pIter->pTable); + if (tsdbWriteBlockInfo(pCommith) < 0) { + return -1; + } - tsdbSetCommitTable(pch, pIter->pTable); + return 0; + } + } // No memory data and no disk data, just return - if (pIter->pIter == NULL && pch->readh.pBlkIdx == NULL) { + if (pIter->pIter == NULL && pCommith->readh.pBlkIdx == NULL) { TSDB_RUNLOCK_TABLE(pIter->pTable); return 0; } - if (tsdbLoadBlockInfo(&(pch->readh), NULL) < 0) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } // Process merge commit - int nBlocks = (pch->readh.pBlkIdx == NULL) ? 0 : pch->readh.pBlkIdx->numOfBlocks; + int nBlocks = (pCommith->readh.pBlkIdx == NULL) ? 0 : pCommith->readh.pBlkIdx->numOfBlocks; TSKEY nextKey = tsdbNextIterKey(pIter->pIter); int cidx = 0; void * ptr = NULL; SBlock *pBlock; if (cidx < nBlocks) { - pBlock = pch->readh.pBlkInfo->blocks + cidx; + pBlock = pCommith->readh.pBlkInfo->blocks + cidx; } else { pBlock = NULL; } while (true) { - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (pBlock == NULL)) break; + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) && (pBlock == NULL)) break; - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) || + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { - if (tsdbMoveBlock(pch, cidx) < 0) { + if (tsdbMoveBlock(pCommith, cidx) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } cidx++; if (cidx < nBlocks) { - pBlock = pch->readh.pBlkInfo->blocks + cidx; + pBlock = pCommith->readh.pBlkInfo->blocks + cidx; } else { pBlock = NULL; } } else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - if (tsdbMergeMemData(pch, pIter, cidx) < 0) { + if (tsdbMergeMemData(pCommith, pIter, cidx) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } cidx++; if (cidx < nBlocks) { - pBlock = pch->readh.pBlkInfo->blocks + cidx; + pBlock = pCommith->readh.pBlkInfo->blocks + cidx; } else { pBlock = NULL; } nextKey = tsdbNextIterKey(pIter->pIter); } else { if (pBlock == NULL) { - if (tsdbCommitMemData(pch, pIter, pch->maxKey, false) < 0) { + if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } nextKey = tsdbNextIterKey(pIter->pIter); } else { - if (tsdbCommitMemData(pch, pIter, pBlock->keyFirst-1, true) < 0) { + if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst-1, true) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } nextKey = tsdbNextIterKey(pIter->pIter); } } - -#if 0 - if (/* Key end */) { - tsdbMoveBlock(); ============= - } else { - if (/*block end*/) { - // process append commit until pch->maxKey >>>>>>> - } else { - if (pBlock->last) { - // TODO: merge the block |||||||||||||||||||||| - } else { - if (pBlock > nextKey) { - // process append commit until pBlock->keyFirst-1 >>>>>> - } else if (pBlock < nextKey) { - // tsdbMoveBlock() ============ - } else { - // merge the block |||||||||||| - } - } - } - } -#endif } TSDB_RUNLOCK_TABLE(pIter->pTable); - if (tsdbWriteBlockInfo(pch) < 0) return -1; + if (tsdbWriteBlockInfo(pCommith) < 0) return -1; return 0; } -static int tsdbSetCommitTable(SCommitH *pch, STable *pTable) { - // TODO +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + + if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pCommith->isRFileSet) { + if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { + return -1; + } + } return 0; } @@ -572,18 +652,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -static int tsdbAppendCommit(SCommitIter *pIter, TSKEY keyEnd) { - // TODO - return 0; -} - -static int tsdbMergeCommit(SCommitIter *pIter, SBlock *pBlock, TSKEY keyEnd) { - // TODO - return 0; -} - static int tsdbWriteBlock(SCommitH *pCommih, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { + // TODO STsdbCfg * pCfg = &(pHelper->pRepo->config); SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); int64_t offset = 0; @@ -1084,4 +1155,46 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (pTarget->numOfRows >= maxRows) break; } +} + +static void tsdbResetCommitFile(SCommitH *pCommith) { + tsdbResetCommitTable(pCommith); + taosArrayClear(pCommith->aBlkIdx); +} + +static void tsdbResetCommitTable(SCommitH *pCommith) { + tdResetDataCols(pCommith->pDataCols); + taosArrayClear(pCommith->aSubBlk); + taosArrayClear(pCommith->aSupBlk); +} + +static int tsdbOpenCommitFile(SCommitH *pCommith, SDFileSet *pRSet) { + if (pRSet == NULL) { + pCommith->isRFileSet = false; + } else { + pCommith->isRFileSet = true; + if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pRSet) < 0) { + return -1; + } + } + + if (tsdbOpenDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), O_WRONLY | O_CREAT) < 0) { + return -1; + } + + return 0; +} + +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + + if (!hasError) { + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype); + fsync(TSDB_FILE_FD(pDFile)); + } + } + tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 86c6a7408f..27ae452e54 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -13,10 +13,7 @@ * along with this program. If not, see . */ -#include -#include - -#include "tsdbMain.h" +#include "tsdbint.h" #define REPO_FS(r) ((r)->fs) #define TSDB_MAX_DFILES(keep, days) ((keep) / (days) + 3) diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 43387144d4..73e12493fd 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -118,6 +118,19 @@ void *tsdbDecodeSDFile(void *buf, SDFile *pDFile) { return buf; } +static int tsdbCopyDFile(SDFile *pSrc, int tolevel, int toid, SDFile *pDest) { + TSDB_FILE_SET_CLOSED(pDest); + + pDest->info = pSrc->info; + tfsInitFile(TSDB_FILE_F(pDest), tolevel, toid, TFILE_REL_NAME(TSDB_FILE_F(pSrc))); + + if (taosCopy(TSDB_FILE_FULL_NAME(pSrc), TSDB_FILE_FULL_NAME(pDest)) < 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return -1; + } + return -1; +} + static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0; @@ -184,8 +197,21 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbCopyDFileSet(SDFileSet *pFromSet, SDFileSet *pToSet) { - // return 0; +int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) { + ASSERT(tolevel > TSDB_FSET_LEVEL(&src)); + + for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { + if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { + while (ftype >= 0) { + remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pDest, ftype))); + ftype--; + } + + return -1; + } + } + + return 0; } static void tsdbGetFilename(int vid, int fid, int64_t ver, TSDB_FILE_T ftype, char *fname) { -- GitLab