diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 29c5a568c7876f8a0560c4f53f5842497f5fd18d..ea38d85257fe08481582ac7af3faad6219a8f0b9 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -85,7 +85,8 @@ struct STSmaStat { struct SQTaskFile { volatile int32_t nRef; - int32_t padding; + int8_t level; + int64_t suid; int64_t version; int64_t size; }; @@ -214,19 +215,21 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { // rsma void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); void tdRSmaFSClose(SRSmaFS *fs); -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version); -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version); +int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); +void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat); int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); -int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer); +int32_t tdRSmaFSRollback(SSma *pSma); +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); -int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName); +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, + char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); @@ -290,8 +293,8 @@ void tdCloseTFile(STFile *pTFile); void tdDestroyTFile(STFile *pTFile); #endif -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, - char *outputName); +void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName); void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 124b0b12151179d28412b206f96aa2f26b757926..c06cfc251952c71ef4ee50d1607a97749b2ffe74 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -203,10 +203,7 @@ void smaCleanUp(); int32_t smaOpen(SVnode* pVnode, int8_t rollback); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); -int32_t smaSyncPreCommit(SSma* pSma); -int32_t smaSyncCommit(SSma* pSma); -int32_t smaSyncPostCommit(SSma* pSma); -int32_t smaPrepareAsyncCommit(SSma* pSma); +int32_t smaPrepareCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 9748963722cb5dac2f0a83bc89fad9a04987cf28..e7574da1360ed38f8afbbc931a6fff1145f28bf1 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -17,49 +17,18 @@ extern SSmaMgmt smaMgmt; -#if 0 -static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); -#endif static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); -#if 0 -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); } - -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); } - -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); } -#endif - /** * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaPrepareAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPrepareCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } /** * @brief async commit, only applicable to Rollup SMA @@ -143,70 +112,6 @@ _exit: return code; } -#if 0 -/** - * @brief pre-commit for rollup sma(sync commit). - * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. - * 2) wait for all triggered fetch tasks to finish - * 3) perform persist task for qTaskInfo - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; - } - - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); - - // step 1: set rsma stat paused - atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - - // step 2: wait for all triggered fetch tasks to finish - int32_t nLoops = 0; - while (1) { - if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); - break; - } else { - smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); - } - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; - } - } - - // step 3: perform persist task for qTaskInfo - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); - - smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); - - return TSDB_CODE_SUCCESS; -} - -/** - * @brief commit for rollup sma - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { -#if 0 - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; - } -#endif - return TSDB_CODE_SUCCESS; -} -#endif - // SQTaskFile ====================================================== /** @@ -218,6 +123,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { + #if 0 SVnode *pVnode = pSma->pVnode; SRSmaFS *pFS = RSMA_FS(pStat); int64_t committed = pStat->commitAppliedVer; @@ -264,31 +170,10 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + #endif return TSDB_CODE_SUCCESS; } -#if 0 -/** - * @brief post-commit for rollup sma - * 1) clean up the outdated qtaskinfo files - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { - SVnode *pVnode = pSma->pVnode; - if (!VND_IS_RSMA(pVnode)) { - return TSDB_CODE_SUCCESS; - } - - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - - tdUpdateQTaskInfoFiles(pSma, pRSmaStat); - - return TSDB_CODE_SUCCESS; -} -#endif - /** * @brief Rsma async commit implementation(only do some necessary light weighted task) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 8db36be741f3187834d4b2f2eca4fa302ca524fe..979be82ad1166192d875ad3950e87ce53da1902a 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -20,6 +20,123 @@ static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2); + +static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { + int32_t n = 0; + + n += tPutI8(p ? p + n : p, pFile->level); + n += tPutI64v(p ? p + n : p, pFile->size); + n += tPutI64v(p ? p + n : p, pFile->suid); + n += tPutI64v(p ? p + n : p, pFile->version); + + return n; +} + +static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) { + int32_t n = 0; + uint32_t size = taosArrayGetSize(pFS->aQTaskInf); + + // version + n += tPutI8(p ? p + n : p, 0); + + // SArray + n += tPutU32v(p ? p + n : p, size); + for (uint32_t i = 0; i < size; ++i) { + n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i)); + } + + return n; +} + +int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) { + int32_t n = 0; + + n += tGetI8(p + n, &pFile->level); + n += tGetI64v(p + n, &pFile->size); + n += tGetI64v(p + n, &pFile->suid); + n += tGetI64v(p + n, &pFile->version); + + return n; +} + +static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { + int32_t code = 0; + int32_t n = 0; + int8_t version = 0; + + // version + n += tGetI8(pData + n, &version); + + // SArray + taosArrayClear(pFS->aQTaskInf); + uint32_t size = 0; + n += tGetU32v(pData + n, &size); + for (uint32_t i = 0; i < size; ++i) { + SQTaskFile qTaskF = {0}; + + int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF); + if (nt < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += nt; + if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + ASSERT(n + sizeof(TSCKSUM) == nData); + +_exit: + return code; +} + +static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) { + int32_t code = 0; + int32_t lino = 0; + + // encode to binary + int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM); + uint8_t *pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + tdRSmaFSToBinary(pData, pFS); + taosCalcChecksumAppend(0, pData, size); + + // save to file + TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t n = taosWriteFile(pFD, pData, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosFsyncFile(pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; +} + /** * @brief Open RSma FS from qTaskInfo files * @@ -27,6 +144,7 @@ static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2); * @param version * @return int32_t */ +#if 0 int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) { SVnode *pVnode = pSma->pVnode; int64_t commitID = pVnode->state.commitID; @@ -70,67 +188,190 @@ _end: } return TSDB_CODE_SUCCESS; } +#endif -void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); } +static int32_t tdRSmaFSCreate(SRSmaFS *pFS) { + int32_t code = 0; -static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { - if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) { - return -1; - } else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) { - return 1; + pFS->aQTaskInf = taosArrayInit(0, sizeof(SQTaskFile)); + if (pFS->aQTaskInf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } - return 0; + +_exit: + return code; } -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - SQTaskFile *pTaskF = NULL; - int32_t oldVal = 0; +static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { + SVnode *pVnode = pSma->pVnode; + if (pVnode->pTfs) { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP); + } + if (current_t) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sCURRENT.t", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP); + } + } else { +#if 0 + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sCURRENT", pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sCURRENT.t", pTsdb->path, TD_DIRSEP); + } +#endif + } +} - taosRLockLatch(RSMA_FS_LOCK(pStat)); - if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) { - oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); - ASSERT(oldVal > 0); +static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + uint8_t *pData = NULL; + + // load binary + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); } - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return oldVal; + + int64_t size; + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosReadFile(pFD, pData, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!taosCheckChecksumWhole(pData, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + + // decode binary + code = tsdbBinaryToFS(pData, size, pFS); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; } -int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - int64_t version = -1; - taosRLockLatch(RSMA_FS_LOCK(pStat)); - if (taosArrayGetSize(aQTaskInf) > 0) { - version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; +static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { + const SQTaskFile *q1 = (const SQTaskFile *)p1; + const SQTaskFile *q2 = (const SQTaskFile *)p2; + + if (q1->suid < q2->suid) { + return -1; + } else if (q1->suid > q2->suid) { + return 1; } - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return version; + + if (q1->level < q2->level) { + return -1; + } else if (q1->level > q2->level) { + return 1; + } + + if (q1->version < q2->version) { + return -1; + } else if (q1->version > q2->version) { + return 1; + } + + return 0; } -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { - SVnode *pVnode = pSma->pVnode; - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - char qTaskFullName[TSDB_FILENAME_LEN]; - SQTaskFile *pTaskF = NULL; - int32_t idx = -1; +static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; - taosWLockLatch(RSMA_FS_LOCK(pStat)); - if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { - ASSERT(idx < taosArrayGetSize(aQTaskInf)); - pTaskF = taosArrayGet(aQTaskInf, idx); - if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); - if (taosRemoveFile(qTaskFullName) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN] = {0}; + + + // SQTaskFile + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); + int32_t nNew = taosArrayGetSize(pFS->aDFileSet); + SDFileSet fSet = {0}; + int8_t sameDisk = 0; + + if (iOld >= nOld && iNew >= nNew) break; + + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL; + + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit); + + iOld++; + iNew++; + } else if (pSetOld->fid < pSetNew->fid) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; } - taosArrayRemove(aQTaskInf, idx); + } else if (pSetOld) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; } } - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } + return code; } /** @@ -229,29 +470,128 @@ _end: return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; } -static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) { - if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) { - return -1; - } else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) { - return 1; + +// EXPOSED APIS ==================================================================================== +int32_t tdRSmaFSCommit(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + SRSmaFS fs = {0}; + + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, current, current_t); + + if (!taosCheckExistFile(current_t)) goto _exit; + + // rename the file + if (taosRenameFile(current_t, current) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); } - return 0; + // Load the new FS + code = tdRSmaFSCreate(&fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tdRSmaLoadFSFromFile(current, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // apply file change + code = tdRSmaFSApplyChange(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + tdRSmaFSClose(&fs); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tdRSmaFSRollback(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, NULL, current_t); + (void)taosRemoveFile(current_t); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno)); + } + return code; } +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + int64_t commitID = pVnode->state.commitID; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = NULL; + + if (!pEnv) { + return TSDB_CODE_SUCCESS; + } + + // open handle + code = tdRSmaFSCreate(&pStat->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, current, current_t); + + if (taosCheckExistFile(current)) { + code = tdRSmaLoadFSFromFile(current, &pStat->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tdRSmaFSRollback(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else { + // empty one + code = tsdbSaveFSToFile(&pTsdb->fs, current); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(!rollback); + } + + // scan and fix FS + code = tsdbScanAndTryFixFS(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +void tdRSmaFSClose(SRSmaFS *pFS) { taosArrayDestroy(pFS->aQTaskInf); } + + + int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) { int32_t code = 0; - int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE); + int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskInfCmprFn1, TD_GE); if (idx < 0) { idx = taosArrayGetSize(pFS->aQTaskInf); } else { SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); - int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile); + int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskFile); if (c == 0) { pTaskF->nRef = qTaskFile->nRef; - pTaskF->version = qTaskFile->version; - pTaskF->size = qTaskFile->size; + ASSERT(pTaskF->size == qTaskFile->size); goto _exit; } } @@ -263,4 +603,95 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) { _exit: return code; -} \ No newline at end of file +} + +int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; + SQTaskFile *pTaskF = NULL; + int32_t oldVal = 0; + + taosRLockLatch(RSMA_FS_LOCK(pStat)); + if (suid > 0 && level > 0) { + if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) { + oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); + ASSERT(oldVal > 0); + } + } else { + int32_t size = taosArrayGetSize(aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); + if (pTaskF->version == version) { + oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); + } + ASSERT(oldVal > 0); + } + } + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + return oldVal; +} + +int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + int64_t version = -1; + + taosRLockLatch(RSMA_FS_LOCK(pStat)); + if (taosArrayGetSize(aQTaskInf) > 0) { + version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; + } + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + return version; +} + +void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { + SVnode *pVnode = pSma->pVnode; + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + char qTaskFullName[TSDB_FILENAME_LEN]; + SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; + SQTaskFile *pTaskF = NULL; + int32_t idx = -1; + + taosWLockLatch(RSMA_FS_LOCK(pStat)); + if (suid > 0 && level > 0) { + if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { + ASSERT(idx < taosArrayGetSize(aQTaskInf)); + pTaskF = taosArrayGet(aQTaskInf, idx); + if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), + qTaskFullName); + if (taosRemoveFile(qTaskFullName) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + } + taosArrayRemove(aQTaskInf, idx); + } + } + } else { + for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) { + pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); + int32_t nRef = INT32_MAX; + if (pTaskF->version == version) { + nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); + } else if (pTaskF->version < version) { + nRef = atomic_load_32(&pTaskF->nRef); + } + if (nRef <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); + if (taosRemoveFile(qTaskFullName) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + } + taosArrayRemove(aQTaskInf, i); + continue; + } + ++i; + } + } + + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); +} diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 2a769b68fe0ff93caa48a6a22030ca02b2af7023..2b442b9de13db7948255a20b7af63ce477015af1 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -150,7 +150,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { } // restore the rsma - if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { + if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) { goto _err; } } @@ -181,8 +181,8 @@ int32_t smaClose(SSma *pSma) { * @param committedVer * @return int32_t */ -int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) { ASSERT(VND_IS_RSMA(pSma->pVnode)); - return tdRSmaProcessRestoreImpl(pSma, type, committedVer); + return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback); } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c92621776c4987bd6da39ae36a2056a8bd722554..788d5c9fc718b65893f877f485a591e1cdc71438 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -28,7 +28,7 @@ SSmaMgmt smaMgmt = { .rsetId = -1, }; -#define TD_QTASKINFO_FNAME_PREFIX "qinf.v" +#define TD_QTASKINFO_FNAME_PREFIX "main.db" typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; @@ -59,12 +59,12 @@ struct SRSmaQTaskInfoItem { void *qTaskInfo; }; -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { - tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { + tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) { - tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, char *outputName) { + tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { @@ -311,8 +311,8 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot pItem->pStreamState = pStreamState; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { - int64_t msInterval = - convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); + int64_t msInterval = + convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); pItem->maxDelay = (int32_t)msInterval; } else { pItem->maxDelay = (int32_t)param->maxdelay[idx]; @@ -1190,7 +1190,7 @@ static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { } #endif -int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) { // step 1: iterate all stables to restore the rsma env int64_t nTables = 0; if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { @@ -1209,7 +1209,7 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) #endif // step 3: open SRSmaFS for qTaskFiles - if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) { + if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) { goto _err; } diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 34f884f9f95776be10f1416fdf50f99a8fafce6e..9bc2619c9e03b9f9e661fb179e214f6dd68ef26f 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -77,6 +77,7 @@ _err: } static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) { + #if 0 int32_t code = 0; SSma* pSma = pReader->pSma; SVnode* pVnode = pSma->pVnode; @@ -133,10 +134,12 @@ _end: } smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName); + #endif return TSDB_CODE_SUCCESS; } static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) { +#if 0 if (!(*ppReader)) { return TSDB_CODE_SUCCESS; } @@ -149,7 +152,7 @@ static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) { tdRSmaFSUnRef(pSma, pStat, version); taosMemoryFreeClear(*ppReader); smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version); - +#endif return TSDB_CODE_SUCCESS; } @@ -307,6 +310,7 @@ struct SRSmaSnapWriter { int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) { int32_t code = 0; + #if 0 SRSmaSnapWriter* pWriter = NULL; SVnode* pVnode = pSma->pVnode; @@ -361,11 +365,13 @@ _err: smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code)); if (pWriter) rsmaSnapWriterClose(&pWriter, 0); *ppWriter = NULL; + #endif return code; } int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; + #if 0 SRSmaSnapWriter* pWriter = *ppWriter; SVnode* pVnode = pWriter->pSma->pVnode; @@ -399,7 +405,8 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { pWriter->pQTaskFWriter->fname, qTaskInfoFullName); // rsma restore - if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { + int8_t rollback = 0; + if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback)) < 0) { goto _err; } smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); @@ -413,6 +420,7 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { _err: smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); + #endif return code; } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 962726f49653b34c5de07365ce354ac9e9978f53..22057166e2ffdb725f5cf489dc078b94c7b65934 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -184,15 +184,15 @@ void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTF #endif -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, - char *outputName) { - if (version < 0) { +void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName) { + if (version >= 0 && level > 0 && suid > 0) { if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, - TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s.%" PRIi64, pdname, + TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version); } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, - vgId, fname); + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi64 "%s%" PRIi8 "%s%s." PRIi64, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, suid, level, fname, version); } } else { if (pdname) { diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index f9a598fec6861f66107011ca5a04dc6df5a33726..c78249988229bc688ff115b1a87aa4befba4a682 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -191,8 +191,7 @@ static void vnodePrepareCommit(SVnode *pVnode) { tsdbPrepareCommit(pVnode->pTsdb); metaPrepareAsyncCommit(pVnode->pMeta); - smaPrepareAsyncCommit(pVnode->pSma); - + smaPrepareCommit(pVnode->pSma); vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL;