diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index ea38d85257fe08481582ac7af3faad6219a8f0b9..9312d18b806350a38f5c7f829a83eca9d5f74c52 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -217,9 +217,12 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); void tdRSmaFSClose(SRSmaFS *fs); +int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); +int32_t tdRSmaFSCommit(SSma *pSma); +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut); +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut); int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); -int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat); int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); @@ -293,9 +296,9 @@ void tdCloseTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile); #endif -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, - int8_t level, int64_t version, char *outputName); -void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); +void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName); +void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c06cfc251952c71ef4ee50d1607a97749b2ffe74..678821366904c3dad9aa048e132dfaaa9122e3c1 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -203,7 +203,7 @@ void smaCleanUp(); int32_t smaOpen(SVnode* pVnode, int8_t rollback); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); -int32_t smaPrepareCommit(SSma* pSma); +int32_t smaPrepareAsyncCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index e7574da1360ed38f8afbbc931a6fff1145f28bf1..d969aad12008e818e79edf480afa6e19573cf470 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -28,7 +28,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); * @param pSma * @return int32_t */ -int32_t smaPrepareCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } /** * @brief async commit, only applicable to Rollup SMA @@ -280,14 +280,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t code = 0; SVnode *pVnode = pSma->pVnode; -#if 0 - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - // perform persist task for qTaskInfo operator - if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { - return TSDB_CODE_FAILED; - } -#endif + + if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) { smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 979be82ad1166192d875ad3950e87ce53da1902a..257769ba11dd81f2e6826df4477caccda3ab6842 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -137,63 +137,10 @@ _exit: return code; } -/** - * @brief Open RSma FS from qTaskInfo files - * - * @param pSma - * @param version - * @return int32_t - */ -#if 0 -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) { - SVnode *pVnode = pSma->pVnode; - int64_t commitID = pVnode->state.commitID; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = NULL; - SArray *output = NULL; - - terrno = TSDB_CODE_SUCCESS; - - if (!pEnv) { - return TSDB_CODE_SUCCESS; - } - - if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) { - goto _end; - } - - pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - - for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { - int32_t vid = 0; - int64_t version = -1; - sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version); - SQTaskFile qTaskFile = {.version = version, .nRef = 1}; - if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) { - goto _end; - } - smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%d", TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef); - } - -_end: - for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { - void *ptr = taosArrayGetP(output, i); - taosMemoryFreeClear(ptr); - } - taosArrayDestroy(output); - - if (terrno != TSDB_CODE_SUCCESS) { - smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr()); - return TSDB_CODE_FAILED; - } - return TSDB_CODE_SUCCESS; -} -#endif - -static int32_t tdRSmaFSCreate(SRSmaFS *pFS) { +static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) { int32_t code = 0; - pFS->aQTaskInf = taosArrayInit(0, sizeof(SQTaskFile)); + pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile)); if (pFS->aQTaskInf == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -207,20 +154,20 @@ static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { SVnode *pVnode = pSma->pVnode; if (pVnode->pTfs) { if (current) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT", tfsGetPrimaryPath(pVnode->pTfs), - TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP); + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); } if (current_t) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT.t", tfsGetPrimaryPath(pVnode->pTfs), - TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP); + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); } } else { #if 0 if (current) { - snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP); } if (current_t) { - snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP); + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP); } #endif } @@ -278,7 +225,6 @@ _exit: return code; } - static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { const SQTaskFile *q1 = (const SQTaskFile *)p1; const SQTaskFile *q2 = (const SQTaskFile *)p2; @@ -305,13 +251,16 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { } static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; +#if 0 + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); int32_t nRef = 0; char fname[TSDB_FILENAME_LEN] = {0}; - // SQTaskFile int32_t iOld = 0; int32_t iNew = 0; @@ -371,6 +320,7 @@ _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); } +#endif return code; } @@ -393,7 +343,7 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **outpu terrno = TSDB_CODE_SUCCESS; - tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); + tdRSmaGetDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); if (!taosCheckExistFile(dir)) { smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir); @@ -471,7 +421,123 @@ _end: } +static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *pFS = RSMA_FS(pStat); + char fname[TSDB_FILENAME_LEN] = {0}; + char fnameVer[TSDB_FILENAME_LEN] = {0}; + + // SArray + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); + + // main.tdb ========= + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fnameVer); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); + + if (taosCheckExistFile(fnameVer)) { + if (taosRenameFile(fnameVer, fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname); + } else if (taosCheckExistFile(fname)) { + if (taosRemoveFile(fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname); + } + } + + { + // remove those invalid files (todo) + // main.tdb-journal.5 // TDB should handle its clear for kill -9 + } + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + + // EXPOSED APIS ==================================================================================== + +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + + // open handle + code = tdRSmaFSCreate(RSMA_FS(pStat), 0); + TSDB_CHECK_CODE(code, lino, _exit); + + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, current, current_t); + + if (taosCheckExistFile(current)) { + code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat)); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tdRSmaFSRollback(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else { + // 1st open with empty current/qTaskInfoFile + code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current); + TSDB_CHECK_CODE(code, lino, _exit); + ASSERT(!rollback); + } + + // scan and try fix(remove main.db/main.db.xxx and use the one with version) + code = tdRSmaFSScanAndTryFix(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +void tdRSmaFSClose(SRSmaFS *pFS) { taosArrayDestroy(pFS->aQTaskInf); } + +int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) { + int32_t code = 0; + int32_t lino = 0; + char tfname[TSDB_FILENAME_LEN]; + + tdRSmaGetCurrentFName(pSma, NULL, tfname); + + // gnrt PRESENT.t + code = tdRSmaSaveFSToFile(pFSNew, tfname); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + int32_t tdRSmaFSCommit(SSma *pSma) { int32_t code = 0; int32_t lino = 0; @@ -490,7 +556,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) { } // Load the new FS - code = tdRSmaFSCreate(&fs); + code = tdRSmaFSCreate(&fs, 0); TSDB_CHECK_CODE(code, lino, _exit); code = tdRSmaLoadFSFromFile(current, &fs); @@ -523,84 +589,33 @@ _exit: return code; } -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { - int32_t code = 0; - int32_t lino = 0; - SVnode *pVnode = pSma->pVnode; - int64_t commitID = pVnode->state.commitID; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = NULL; - - if (!pEnv) { - return TSDB_CODE_SUCCESS; - } - - // open handle - code = tdRSmaFSCreate(&pStat->fs); - TSDB_CHECK_CODE(code, lino, _exit); +int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size) { + int32_t code = 0; - // open impl - char current[TSDB_FILENAME_LEN] = {0}; - char current_t[TSDB_FILENAME_LEN] = {0}; - tdRSmaGetCurrentFName(pSma, current, current_t); + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *qTaskF = qTaskFile + i; - if (taosCheckExistFile(current)) { - code = tdRSmaLoadFSFromFile(current, &pStat->fs); - TSDB_CHECK_CODE(code, lino, _exit); + int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE); - if (taosCheckExistFile(current_t)) { - if (rollback) { - code = tdRSmaFSRollback(pSma); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbFSCommit(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); + if (idx < 0) { + idx = taosArrayGetSize(pFS->aQTaskInf); + } else { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); + int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); + if (c == 0) { + ASSERT(0); + pTaskF->nRef = qTaskF->nRef; + ASSERT(pTaskF->size == qTaskF->size); + goto _exit; } } - } else { - // empty one - code = tsdbSaveFSToFile(&pTsdb->fs, current); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(!rollback); - } - - // scan and fix FS - code = tsdbScanAndTryFixFS(pTsdb); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -void tdRSmaFSClose(SRSmaFS *pFS) { taosArrayDestroy(pFS->aQTaskInf); } - - -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) { - int32_t code = 0; - int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskInfCmprFn1, TD_GE); - - if (idx < 0) { - idx = taosArrayGetSize(pFS->aQTaskInf); - } else { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); - int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskFile); - if (c == 0) { - pTaskF->nRef = qTaskFile->nRef; - ASSERT(pTaskF->size == qTaskFile->size); + if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } } - if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - _exit: return code; } @@ -613,17 +628,17 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in taosRLockLatch(RSMA_FS_LOCK(pStat)); if (suid > 0 && level > 0) { + ASSERT(version > 0); if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) { oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); ASSERT(oldVal > 0); } } else { + // ref all int32_t size = taosArrayGetSize(aQTaskInf); for (int32_t i = 0; i < size; ++i) { pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); - if (pTaskF->version == version) { - oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); - } + oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); ASSERT(oldVal > 0); } } @@ -631,18 +646,6 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in return oldVal; } -int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - int64_t version = -1; - - taosRLockLatch(RSMA_FS_LOCK(pStat)); - if (taosArrayGetSize(aQTaskInf) > 0) { - version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; - } - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return version; -} - void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { SVnode *pVnode = pSma->pVnode; SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; @@ -653,12 +656,13 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int taosWLockLatch(RSMA_FS_LOCK(pStat)); if (suid > 0 && level > 0) { + ASSERT(version > 0); if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { ASSERT(idx < taosArrayGetSize(aQTaskInf)); pTaskF = taosArrayGet(aQTaskInf, idx); if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), - qTaskFullName); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); if (taosRemoveFile(qTaskFullName) < 0) { smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, tstrerror(TAOS_SYSTEM_ERROR(errno))); @@ -670,28 +674,64 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int } } else { for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) { - pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); - int32_t nRef = INT32_MAX; - if (pTaskF->version == version) { - nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); - } else if (pTaskF->version < version) { - nRef = atomic_load_32(&pTaskF->nRef); - } - if (nRef <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); - if (taosRemoveFile(qTaskFullName) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); - } - taosArrayRemove(aQTaskInf, i); - continue; + pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); + int32_t nRef = INT32_MAX; + if (pTaskF->version == version) { + nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); + } else if (pTaskF->version < version) { + nRef = atomic_load_32(&pTaskF->nRef); + } + if (nRef <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); + if (taosRemoveFile(qTaskFullName) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); } - ++i; + taosArrayRemove(aQTaskInf, i); + continue; + } + ++i; } } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); } + +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + int32_t code = 0; + + taosRLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSCopy(pSma, pFSOut); + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut) { + int32_t code = 0; + int32_t lino = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *pFS = RSMA_FS(pStat); + int32_t size = 0; + + code = tdRSmaFSCreate(pFSOut, size); + TSDB_CHECK_CODE(code, lino, _exit); + + taosArraySetSize(pFSOut->aQTaskInf, size); + memcpy(TARRAY_GET_ELEM(pFSOut->aQTaskInf, 0), TARRAY_GET_ELEM(pFS->aQTaskInf, 0), size * sizeof(SQTaskFile)); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 788d5c9fc718b65893f877f485a591e1cdc71438..c3e7b367459696b4c419a5942a7709d0e6f1ae38 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -28,8 +28,6 @@ SSmaMgmt smaMgmt = { .rsetId = -1, }; -#define TD_QTASKINFO_FNAME_PREFIX "main.db" - typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); @@ -59,26 +57,6 @@ struct SRSmaQTaskInfoItem { void *qTaskInfo; }; -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { - tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); -} - -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, char *outputName) { - tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); -} - -void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { - tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); - int32_t rsmaLen = strlen(outputName); - snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); -} - -void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { - tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); - int32_t rsmaLen = strlen(outputName); - snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); -} - static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC if (!taskHandle || !(*taskHandle)) return; @@ -311,8 +289,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot pItem->pStreamState = pStreamState; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { - int64_t msInterval = - convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); + int64_t msInterval = + convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); pItem->maxDelay = (int32_t)msInterval; } else { pItem->maxDelay = (int32_t)param->maxdelay[idx]; @@ -353,10 +331,12 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_SUCCESS; } +#if 0 if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; return TSDB_CODE_FAILED; } +#endif SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); @@ -1178,20 +1158,21 @@ _err: } /** - * @brief reload ts data from checkpoint - * - * @param pSma - * @return int32_t + * N.B. the data would be restored from the unified WAL replay procedure */ -#if 0 -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { - // NOTHING TODO: the data would be restored from the unified WAL replay procedure - return TSDB_CODE_SUCCESS; -} -#endif - int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) { - // step 1: iterate all stables to restore the rsma env + // step 1: init env + if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + return TSDB_CODE_FAILED; + } + + // step 2: open SRSmaFS for qTaskFiles + if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) { + goto _err; + } + + // step 3: iterate all stables to restore the rsma env int64_t nTables = 0; if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; @@ -1201,18 +1182,6 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, return TSDB_CODE_SUCCESS; } - // step 2: reload ts data from checkpoint -#if 0 - if (tdRSmaRestoreTSDataReload(pSma) < 0) { - goto _err; - } -#endif - - // step 3: open SRSmaFS for qTaskFiles - if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) { - goto _err; - } - smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer); return TSDB_CODE_SUCCESS; _err: @@ -1222,19 +1191,26 @@ _err: } int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - int32_t vid = SMA_VID(pSma); + int32_t code = 0; + int32_t lino = 0; + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + SArray *qTaskFArray = NULL; + int64_t version = pRSmaStat->commitAppliedVer; + TdFilePtr pOutFD = NULL; + TdFilePtr pInFD = NULL; + char fname[TSDB_FILENAME_LEN]; + char fnameVer[TSDB_FILENAME_LEN]; + SRSmaFS fs = {0}; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } - int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); - if (pRSmaStat->commitAppliedVer <= fsMaxVer) { - smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, - pRSmaStat->commitAppliedVer, fsMaxVer); - return TSDB_CODE_SUCCESS; + qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile)); + if (!qTaskFArray) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } void *infoHash = NULL; @@ -1249,19 +1225,69 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); if (pItem && pItem->pStreamState) { if (streamStateCommit(pItem->pStreamState) < 0) { - terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; - goto _err; + code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", TD_VID(pVnode), + pRSmaInfo->suid, i + 1); + + // qTaskInfo file + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs), + fnameVer); + if (taosCheckExistFile(fnameVer)) { + smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer); + } + + pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + pInFD = taosOpenFile(fname, TD_FILE_READ); + if (pInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size = 0; + if (taosFStatFile(pInFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); } - smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid, - i + 1); + + int64_t offset = 0; + if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, rsma persist, send qtaskinfo file %s failed since %s", TD_VID(pVnode), fname, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosCloseFile(&pOutFD); + taosCloseFile(&pInFD); + + SQTaskFile qTaskF = {.nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size}; + taosArrayPush(qTaskFArray, &qTaskF); } } } - return TSDB_CODE_SUCCESS; -_err: - smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); - return TSDB_CODE_FAILED; + code = tdRSmaFSTakeSnapshot(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tdRSmaFSPrepareCommit(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + + taosArrayDestroy(fs.aQTaskInf); + taosArrayDestroy(qTaskFArray); + if (code) { + if (pOutFD) taosCloseFile(&pOutFD); + if (pInFD) taosCloseFile(&pInFD); + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + terrno = code; + return code; } /** diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 22057166e2ffdb725f5cf489dc078b94c7b65934..2b2c5cb1a5438451a4241ecd05762642bb0b951a 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -15,197 +15,72 @@ #include "sma.h" -// smaFileUtil ================ -#if 0 -#define TD_FILE_STATE_OK 0 -#define TD_FILE_STATE_BAD 1 - -#define TD_FILE_INIT_MAGIC 0xFFFFFFFF - -static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); -static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); - -static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { - int32_t tlen = 0; - - tlen += taosEncodeFixedU32(buf, pInfo->magic); - tlen += taosEncodeFixedU32(buf, pInfo->ftype); - tlen += taosEncodeFixedU32(buf, pInfo->fver); - tlen += taosEncodeFixedI64(buf, pInfo->fsize); - - return tlen; -} - -static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); - buf = taosDecodeFixedU32(buf, &(pInfo->fver)); - buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); - - return buf; -} - -int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); - if (nwrite < nbyte) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return nwrite; -} - -int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence); - if (loffset < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return loffset; -} - -int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { - ASSERT(TD_TFILE_OPENED(pTFile)); - return taosFStatFile(pTFile->pFile, size, NULL); -} - -int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); - if (nread < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return nread; -} - -int32_t tdUpdateTFileHeader(STFile *pTFile) { - char buf[TD_FILE_HEAD_SIZE] = "\0"; - - if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { - return -1; - } - - void *ptr = buf; - tdEncodeTFInfo(&ptr, &(pTFile->info)); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE); - if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { - return -1; - } - - return 0; -} +#define TD_QTASKINFO_FNAME_PREFIX "main.db" -int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { - char buf[TD_FILE_HEAD_SIZE] = "\0"; - uint32_t _version; - - ASSERT(TD_TFILE_OPENED(pTFile)); - - if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { - return -1; - } - - if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) { - terrno = TSDB_CODE_FILE_CORRUPTED; - return -1; - } - - void *pBuf = buf; - pBuf = tdDecodeTFInfo(pBuf, pInfo); - return 0; -} - -void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { - pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { + tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } -int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t toffset; - - if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) { - return -1; - } - -#if 0 - smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), - toffset, nbyte, toffset + nbyte); -#endif - - ASSERT(pTFile->info.fsize == toffset); - - if (offset) { - *offset = toffset; - } - - if (tdWriteTFile(pTFile, buf, nbyte) < 0) { - return -1; - } - - pTFile->info.fsize += nbyte; - - return nbyte; +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, + char *outputName) { + tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } -int32_t tdOpenTFile(STFile *pTFile, int flags) { - ASSERT(!TD_TFILE_OPENED(pTFile)); - - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { + tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); } -void tdCloseTFile(STFile *pTFile) { - if (TD_TFILE_OPENED(pTFile)) { - taosCloseFile(&pTFile->pFile); - TD_TFILE_SET_CLOSED(pTFile); - } +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { + tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); } -void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); } - -#endif - -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, - int8_t level, int64_t version, char *outputName) { - if (version >= 0 && level > 0 && suid > 0) { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, pdname, - TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version); +void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName) { + if (level >= 0 && suid > 0) { + if (version >= 0) { + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, pdname, + TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname, + version); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname, version); + } } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s." PRIi64, TD_DIRSEP, - vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s", pdname, + TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s", TD_DIRSEP, vgId, + TD_DIRSEP, dname, TD_DIRSEP, suid, TD_DIRSEP, level, TD_DIRSEP, fname); + } } } else { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, - vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); + if (version >= 0) { + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, + TD_DIRSEP, vgId, fname, version); + } } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, - TD_DIRSEP, vgId, fname, version); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, + TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, + TD_DIRSEP, vgId, fname); + } } } } -void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { +void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { if (pdname) { if (endWithSep) { snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP,