diff --git a/src/tsdb/inc/tsdbCommit.h b/src/tsdb/inc/tsdbCommit.h index 5e740081d187466435860a3a6c066412419ab571..6bd5dc432584bf1e7f8ed4274efdf014edf54e50 100644 --- a/src/tsdb/inc/tsdbCommit.h +++ b/src/tsdb/inc/tsdbCommit.h @@ -33,6 +33,7 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); void *tsdbCommitData(STsdbRepo *pRepo); +int tsdbApplyRtn(STsdbRepo *pRepo); static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { if (fid >= pRtn->maxFid) { diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index e74c3238e2ac1d70087c615e4a3e6310a0cb3050..945b74af353bcf771c870a8e66fff7eb5f51c614 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -86,6 +86,7 @@ struct STsdbRepo { SMemTable* mem; SMemTable* imem; STsdbFS* fs; + SRtn rtn; tsem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 734b47a718cbf43fc24982dd3bbc6f862c94afd2..f2bbe347bd0bfa1f8f7ae5feb016dbfea13adfd7 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -86,7 +86,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -static int tsdbApplyRtn(STsdbRepo *pRepo); static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { @@ -1431,7 +1430,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p return false; } -static int tsdbApplyRtn(STsdbRepo *pRepo) { +int tsdbApplyRtn(STsdbRepo *pRepo) { SRtn rtn; SFSIter fsiter; STsdbFS * pfs = REPO_FS(pRepo); diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index fd9b5e77e3fba01d49fc6f8f962730f1b8fbc9ec..54372ae8c28d91a72243256a74a8fb53c317eab2 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -33,7 +33,9 @@ static int tsdbScanDataDir(STsdbRepo *pRepo); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); static int tsdbRestoreCurrent(STsdbRepo *pRepo); static int tsdbComparTFILE(const void *arg1, const void *arg2); -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo); +static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired); +static int tsdbProcessExpiredFS(STsdbRepo *pRepo); +static int tsdbCreateMeta(STsdbRepo *pRepo); // ================== CURRENT file header info static int tsdbEncodeFSHeader(void **buf, SFSHeader *pHeader) { @@ -212,6 +214,8 @@ STsdbFS *tsdbNewFS(STsdbCfg *pCfg) { return NULL; } + pfs->intxn = false; + pfs->nstatus = tsdbNewFSStatus(maxFSet); if (pfs->nstatus == NULL) { tsdbFreeFS(pfs); @@ -234,22 +238,84 @@ void *tsdbFreeFS(STsdbFS *pfs) { return NULL; } +static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { + tsdbStartFSTxn(pRepo, 0, 0); + if (tsdbCreateMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + if (tsdbApplyRtn(pRepo) < 0) { + tsdbEndFSTxnWithError(REPO_FS(pRepo)); + tsdbError("vgId:%d failed to apply rtn since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + if (tsdbEndFSTxn(pRepo) < 0) { + tsdbError("vgId:%d failed to end fs txn since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + return 0; +} + +static int tsdbCreateMeta(STsdbRepo *pRepo) { + STsdbFS *pfs = REPO_FS(pRepo); + SMFile * pOMFile = pfs->cstatus->pmf; + SMFile mf; + SDiskID did; + + if (pOMFile != NULL) { + // keep the old meta file + tsdbUpdateMFile(pfs, pOMFile); + return 0; + } + + // Create a new meta file + did.level = TFS_PRIMARY_LEVEL; + did.id = TFS_PRIMARY_ID; + tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + + if (tsdbCreateMFile(&mf, true) < 0) { + tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + + tsdbInfo("vgId:%d meta file %s is created", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); + + if (tsdbUpdateMFileHeader(&mf) < 0) { + tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); + tsdbApplyMFileChange(&mf, pOMFile); + return -1; + } + + TSDB_FILE_FSYNC(&mf); + tsdbCloseMFile(&mf); + tsdbUpdateMFile(pfs, &mf); + + return 0; +} + int tsdbOpenFS(STsdbRepo *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); char current[TSDB_FILENAME_LEN] = "\0"; + int nExpired = 0; ASSERT(pfs != NULL); tsdbGetTxnFname(REPO_ID(pRepo), TSDB_TXN_CURR_FILE, current); + tsdbGetRtnSnap(pRepo, &pRepo->rtn); if (access(current, F_OK) == 0) { if (tsdbOpenFSFromCurrent(pRepo) < 0) { tsdbError("vgId:%d failed to open FS since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } - tsdbScanAndTryFixDFilesHeader(pRepo); + tsdbScanAndTryFixDFilesHeader(pRepo, &nExpired); + if (nExpired > 0) { + tsdbProcessExpiredFS(pRepo); + } } else { + // should skip expired fileset inside of the function if (tsdbRestoreCurrent(pRepo) < 0) { tsdbError("vgId:%d failed to restore current file since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; @@ -1110,6 +1176,11 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { ASSERT(tvid == REPO_ID(pRepo)); + if (tfid < pRepo->rtn.minFid) { // skip file expired + ++index; + continue; + } + if (ftype == 0) { fset.fid = tfid; } else { @@ -1206,7 +1277,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { } } -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) { +static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; SDFInfo info; @@ -1214,7 +1285,9 @@ static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo) { for (size_t i = 0; i < taosArrayGetSize(pStatus->df); i++) { SDFileSet fset; tsdbInitDFileSetEx(&fset, (SDFileSet *)taosArrayGet(pStatus->df, i)); - + if (fset.fid < pRepo->rtn.minFid) { + ++*nExpired; + } tsdbDebug("vgId:%d scan DFileSet %d header", REPO_ID(pRepo), fset.fid); if (tsdbOpenDFileSet(&fset, O_RDWR) < 0) { diff --git a/src/tsdb/src/tsdbSync.c b/src/tsdb/src/tsdbSync.c index 5a2756537e6a37c8b4a0e7c3e1f81199523df2eb..edcb84d091eb4a1bcb4cb23835a3c889eee35d54 100644 --- a/src/tsdb/src/tsdbSync.c +++ b/src/tsdb/src/tsdbSync.c @@ -424,24 +424,42 @@ static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { } if (tsdbSendDecision(pSynch, false) < 0) { - tsdbError("vgId:%d, filed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } } else { // Need to copy from remote - tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid); - - // Notify remote to send there file here - if (tsdbSendDecision(pSynch, true) < 0) { - tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; + int fidLevel = tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)); + if (fidLevel < 0) { // expired fileset + tsdbInfo("vgId:%d, fileset:%d will be skipped as expired", REPO_ID(pRepo), pSynch->pdf->fid); + if (tsdbSendDecision(pSynch, false) < 0) { + tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + // Move forward + if (tsdbRecvDFileSetInfo(pSynch) < 0) { + tsdbError("vgId:%d, failed to recv fileset since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } + if (pLSet) { + pLSet = tsdbFSIterNext(&fsiter); + } + // Next loop + continue; + } else { + tsdbInfo("vgId:%d, fileset:%d will be received", REPO_ID(pRepo), pSynch->pdf->fid); + // Notify remote to send there file here + if (tsdbSendDecision(pSynch, true) < 0) { + tsdbError("vgId:%d, failed to send decision since %s", REPO_ID(pRepo), tstrerror(terrno)); + return -1; + } } // Create local files and copy from remote SDiskID did; SDFileSet fset; - tfsAllocDisk(tsdbGetFidLevel(pSynch->pdf->fid, &(pSynch->rtn)), &(did.level), &(did.id)); + tfsAllocDisk(fidLevel, &(did.level), &(did.id)); if (did.level == TFS_UNDECIDED_LEVEL) { terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; tsdbError("vgId:%d, failed allc disk since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -548,6 +566,13 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { STsdbRepo *pRepo = pSynch->pRepo; bool toSend = false; + // skip expired fileset + if (pSet && tsdbGetFidLevel(pSet->fid, &(pSynch->rtn)) < 0) { + tsdbInfo("vgId:%d, don't sync send since fileset:%d smaller than minFid:%d", REPO_ID(pRepo), pSet->fid, + pSynch->rtn.minFid); + return 0; + } + if (tsdbSendDFileSetInfo(pSynch, pSet) < 0) { tsdbError("vgId:%d, failed to send fileset:%d info since %s", REPO_ID(pRepo), pSet ? pSet->fid : -1, tstrerror(terrno)); return -1;