diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index 7c7e1bd0f79254197e1c99dd5ca0544770656cea..a6b72e38bd16c2a7be7b7ae7983486aaaed094c6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "tsdbFS2.h" static bool tsdbShouldDoRetentionImpl(STsdb *pTsdb, int64_t now) { for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { @@ -111,4 +112,238 @@ int32_t tsdbCommitRetention(STsdb *pTsdb) { taosThreadRwlockUnlock(&pTsdb->rwLock); tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); return 0; -} \ No newline at end of file +} + +// new ============== +typedef struct { + STsdb *tsdb; + int32_t szPage; + int64_t now; + int64_t cid; + + TFileSetArray *fsetArr; + TFileOpArray *fopArr; + + struct { + int32_t fsetArrIdx; + STFileSet *fset; + } ctx[1]; +} SRTXer; + +static int32_t tsdbDoRemoveFileObject(SRTXer *rtxer, const STFileObj *fobj) { + STFileOp op = { + .optype = TSDB_FOP_REMOVE, + .fid = fobj->f->fid, + .of = fobj->f[0], + }; + + return TARRAY2_APPEND(rtxer->fopArr, op); +} + +static int32_t tsdbDoCopyFile(SRTXer *rtxer, const STFileObj *from, const STFile *to) { + int32_t code = 0; + int32_t lino = 0; + + char fname[TSDB_FILENAME_LEN]; + TdFilePtr fdFrom = NULL; + TdFilePtr fdTo = NULL; + + tsdbTFileName(rtxer->tsdb, to, fname); + + fdFrom = taosOpenFile(from->fname, TD_FILE_READ); + if (fdFrom == NULL) code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + + fdTo = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (fdTo == NULL) code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + + int64_t n = taosFSendFile(fdTo, fdFrom, 0, tsdbLogicToFileSize(from->f->size, rtxer->szPage)); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosCloseFile(&fdFrom); + taosCloseFile(&fdTo); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + taosCloseFile(&fdFrom); + taosCloseFile(&fdTo); + } + return code; +} + +static int32_t tsdbDoMigrateFileObj(SRTXer *rtxer, const STFileObj *fobj, const SDiskID *did) { + int32_t code = 0; + int32_t lino = 0; + STFileOp op = {0}; + + // remove old + op = (STFileOp){ + .optype = TSDB_FOP_REMOVE, + .fid = fobj->f->fid, + .of = fobj->f[0], + }; + + code = TARRAY2_APPEND(rtxer->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + // create new + op = (STFileOp){ + .optype = TSDB_FOP_CREATE, + .fid = fobj->f->fid, + .nf = + { + .type = fobj->f->type, + .did = did[0], + .fid = fobj->f->fid, + .cid = rtxer->cid, + .size = fobj->f->size, + .stt[0] = + { + .level = fobj->f->stt[0].level, + }, + }, + }; + + code = TARRAY2_APPEND(rtxer->fopArr, op); + TSDB_CHECK_CODE(code, lino, _exit); + + // do copy the file + code = tsdbDoCopyFile(rtxer, fobj, &op.nf); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + } + return code; +} + +static int32_t tsdbDoRetentionBegin(STsdb *tsdb, SRTXer *rtxer) { + int32_t code = 0; + int32_t lino = 0; + + // TODO: wait for merge and compact task done + + rtxer->tsdb = tsdb; + rtxer->szPage = tsdb->pVnode->config.tsdbPageSize; + rtxer->now = taosGetTimestampMs(); + rtxer->cid = tsdbFSAllocEid(tsdb->pFS); + + code = tsdbFSCreateCopySnapshot(tsdb->pFS, &rtxer->fsetArr); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + } else { + tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__); + } + return code; +} + +static int32_t tsdbDoRetentionEnd(SRTXer *rtxer) { + int32_t code = 0; + int32_t lino = 0; + + if (TARRAY2_SIZE(rtxer->fopArr) == 0) goto _exit; + + code = tsdbFSEditBegin(rtxer->tsdb->pFS, rtxer->fopArr, TSDB_FEDIT_MERGE); + TSDB_CHECK_CODE(code, lino, _exit); + + taosThreadRwlockWrlock(&rtxer->tsdb->rwLock); + + code = tsdbFSEditCommit(rtxer->tsdb->pFS); + if (code) { + taosThreadRwlockUnlock(&rtxer->tsdb->rwLock); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosThreadRwlockUnlock(&rtxer->tsdb->rwLock); + + TARRAY2_DESTROY(rtxer->fopArr, NULL); + tsdbFSDestroyCopySnapshot(&rtxer->fsetArr); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + } else { + tsdbInfo("vid:%d, cid:%" PRId64 ", %s done", TD_VID(rtxer->tsdb->pVnode), rtxer->cid, __func__); + } + return code; +} + +static int32_t tsdbDoRetention2(STsdb *tsdb) { + int32_t code = 0; + int32_t lino = 0; + + SRTXer rtxer[1] = {0}; + + code = tsdbDoRetentionBegin(tsdb, rtxer); + TSDB_CHECK_CODE(code, lino, _exit); + + while (rtxer->ctx->fsetArrIdx < TARRAY2_SIZE(rtxer->fsetArr)) { + rtxer->ctx->fset = TARRAY2_GET(rtxer->fsetArr, rtxer->ctx->fsetArrIdx); + + STFileObj *fobj; + int32_t expLevel = tsdbFidLevel(rtxer->ctx->fset->fid, &rtxer->tsdb->keepCfg, rtxer->now); + + if (expLevel < 0) { // remove the file set + for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) { + if (fobj == NULL) continue; + + code = tsdbDoRemoveFileObject(rtxer, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } + + SSttLvl *lvl; + TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbDoRemoveFileObject(rtxer, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else if (expLevel == 0) { + continue; + } else { + SDiskID did; + + if (tfsAllocDisk(rtxer->tsdb->pVnode->pTfs, expLevel, &did) < 0) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // data + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtxer->ctx->fset->farr[ftype], 1); ++ftype) { + if (fobj == NULL) continue; + + if (fobj->f->did.level == did.level) continue; + code = tsdbDoMigrateFileObj(rtxer, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // stt + SSttLvl *lvl; + TARRAY2_FOREACH(rtxer->ctx->fset->lvlArr, lvl) { + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + if (fobj->f->did.level == did.level) continue; + + code = tsdbDoMigrateFileObj(rtxer, fobj, &did); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + } + + code = tsdbDoRetentionEnd(rtxer); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(rtxer->tsdb->pVnode), lino, code); + } + return code; +}