diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 52221bdd44d1461c43f88110540b45eee553c87a..4b46162c7dac523e9a5b4b981428b6c11e0a9895 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -700,7 +700,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) -// #define TSDB_CODE_RSMA_FILE_CORRUPTED TAOS_DEF_ERROR_CODE(0, 0x3153) +#define TSDB_CODE_RSMA_FS_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3153) #define TSDB_CODE_RSMA_REMOVE_EXISTS TAOS_DEF_ERROR_CODE(0, 0x3154) #define TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP TAOS_DEF_ERROR_CODE(0, 0x3155) #define TSDB_CODE_RSMA_EMPTY_INFO TAOS_DEF_ERROR_CODE(0, 0x3156) @@ -708,6 +708,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_REGEX_MATCH TAOS_DEF_ERROR_CODE(0, 0x3158) #define TSDB_CODE_RSMA_STREAM_STATE_OPEN TAOS_DEF_ERROR_CODE(0, 0x3159) #define TSDB_CODE_RSMA_STREAM_STATE_COMMIT TAOS_DEF_ERROR_CODE(0, 0x3160) +#define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161) +#define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162) +#define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 50e3c3a9c16e54a70d2b4f07a4640d87013a7776..007207562380f1e29ca7309025bf774dff27e333 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -44,7 +44,6 @@ typedef struct SRSmaInfoItem SRSmaInfoItem; typedef struct SRSmaFS SRSmaFS; typedef struct SQTaskFile SQTaskFile; typedef struct SQTaskFReader SQTaskFReader; -typedef struct SQTaskFWriter SQTaskFWriter; struct SSmaEnv { SRWLatch lock; @@ -85,22 +84,20 @@ struct STSmaStat { struct SQTaskFile { volatile int32_t nRef; - int32_t padding; + int8_t level; + int64_t suid; int64_t version; int64_t size; + int64_t mtime; }; struct SQTaskFReader { SSma *pSma; + int8_t level; + int64_t suid; int64_t version; TdFilePtr pReadH; }; -struct SQTaskFWriter { - SSma *pSma; - int64_t version; - TdFilePtr pWriteH; - char *fname; -}; struct SRSmaFS { SArray *aQTaskInf; // array of SQTaskFile @@ -214,85 +211,40 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { // rsma void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); void tdRSmaFSClose(SRSmaFS *fs); -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version); -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version); -int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat); -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); -int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer); +int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); +int32_t tdRSmaFSCommit(SSma *pSma); +int32_t tdRSmaFSFinishCommit(SSma *pSma); +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); +void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); +int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); +int32_t tdRSmaFSRollback(SSma *pSma); +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); -int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName); +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, + char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName); void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int32_t ref = T_REF_INC(pRSmaInfo); - smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); + smaTrace("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); } static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int32_t ref = T_REF_DEC(pRSmaInfo); - smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); + smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); } -// smaFileUtil ================ - -#define TD_FILE_HEAD_SIZE 512 - -typedef struct STFInfo STFInfo; -typedef struct STFile STFile; - -struct STFInfo { - // common fields - uint32_t magic; - uint32_t ftype; - uint32_t fver; - int64_t fsize; -}; - -enum { - TD_FTYPE_RSMA_QTASKINFO = 0, -}; - -#if 0 -struct STFile { - uint8_t state; - STFInfo info; - char *fname; - TdFilePtr pFile; -}; - -#define TD_TFILE_PFILE(tf) ((tf)->pFile) -#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) -#define TD_TFILE_FULL_NAME(tf) ((tf)->fname) -#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) -#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf)) -#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL) -#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s)) - -int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname); -int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType); -int32_t tdOpenTFile(STFile *pTFile, int flags); -int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); -int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); -int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); -int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); -int64_t tdGetTFileSize(STFile *pTFile, int64_t *size); -int32_t tdRemoveTFile(STFile *pTFile); -int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); -int32_t tdUpdateTFileHeader(STFile *pTFile); -void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); -void tdCloseTFile(STFile *pTFile); -void tdDestroyTFile(STFile *pTFile); -#endif - -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, - char *outputName); -void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); +void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName); +void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 33f762a33b112c600b321585de5359c932522355..37922fa895f80e845858a035273c46659efee1f2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -211,9 +211,6 @@ void smaCleanUp(); int32_t smaOpen(SVnode* pVnode, int8_t rollback); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); -int32_t smaSyncPreCommit(SSma* pSma); -int32_t smaSyncCommit(SSma* pSma); -int32_t smaSyncPostCommit(SSma* pSma); int32_t smaPrepareAsyncCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaFinishCommit(SSma* pSma); @@ -228,7 +225,6 @@ int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); -void tdUidStoreDestory(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore); // SMetaSnapReader ======================================== @@ -275,6 +271,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); typedef struct { @@ -418,6 +415,7 @@ enum { struct SSnapDataHdr { int8_t type; + int8_t flag; int64_t index; int64_t size; uint8_t data[]; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 9748963722cb5dac2f0a83bc89fad9a04987cf28..8b43de7421a6329273ede2eefb0a047d94ba5c98 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -17,42 +17,11 @@ extern SSmaMgmt smaMgmt; -#if 0 -static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma); -static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma); -#endif static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma); static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo); static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma); static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat); -#if 0 -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncPreCommit(SSma *pSma) { return tdProcessRSmaSyncPreCommitImpl(pSma); } - -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncCommit(SSma *pSma) { return tdProcessRSmaSyncCommitImpl(pSma); } - -/** - * @brief Only applicable to Rollup SMA - * - * @param pSma - * @return int32_t - */ -int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(pSma); } -#endif - /** * @brief async commit, only applicable to Rollup SMA * @@ -128,84 +97,24 @@ _exit: int32_t smaFinishCommit(SSma *pSma) { int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; + code = tdRSmaFSFinishCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { - smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { - smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: - terrno = code; - return code; -} - -#if 0 -/** - * @brief pre-commit for rollup sma(sync commit). - * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. - * 2) wait for all triggered fetch tasks to finish - * 3) perform persist task for qTaskInfo - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; - } - - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); - SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); - - // step 1: set rsma stat paused - atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - - // step 2: wait for all triggered fetch tasks to finish - int32_t nLoops = 0; - while (1) { - if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); - break; - } else { - smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); - } - ++nLoops; - if (nLoops > 1000) { - sched_yield(); - nLoops = 0; - } - } - - // step 3: perform persist task for qTaskInfo - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); - - smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); - - return TSDB_CODE_SUCCESS; -} - -/** - * @brief commit for rollup sma - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { -#if 0 - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } -#endif - return TSDB_CODE_SUCCESS; + return code; } -#endif // SQTaskFile ====================================================== @@ -218,6 +127,7 @@ static int32_t tdProcessRSmaSyncCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { +#if 0 SVnode *pVnode = pSma->pVnode; SRSmaFS *pFS = RSMA_FS(pStat); int64_t committed = pStat->commitAppliedVer; @@ -264,31 +174,10 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); +#endif return TSDB_CODE_SUCCESS; } -#if 0 -/** - * @brief post-commit for rollup sma - * 1) clean up the outdated qtaskinfo files - * - * @param pSma - * @return int32_t - */ -static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { - SVnode *pVnode = pSma->pVnode; - if (!VND_IS_RSMA(pVnode)) { - return TSDB_CODE_SUCCESS; - } - - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - - tdUpdateQTaskInfoFiles(pSma, pRSmaStat); - - return TSDB_CODE_SUCCESS; -} -#endif - /** * @brief Rsma async commit implementation(only do some necessary light weighted task) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED @@ -298,9 +187,11 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { + int32_t code = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { - return TSDB_CODE_SUCCESS; + return code; } SSmaStat *pStat = SMA_ENV_STAT(pEnv); @@ -317,7 +208,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - ASSERT(pRSmaStat->commitAppliedVer > 0); + // ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait for all triggered fetch tasks to finish nLoops = 0; @@ -351,8 +242,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { - return TSDB_CODE_FAILED; + if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) { + return code; } smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); @@ -383,7 +274,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); - return TSDB_CODE_SUCCESS; + return code; } /** @@ -394,26 +285,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { */ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; -#if 0 - SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); - // perform persist task for qTaskInfo operator - if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { - return TSDB_CODE_FAILED; - } -#endif + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCommit(VND_RSMA1(pVnode), pInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCommit(VND_RSMA2(pVnode), pInfo); + TSDB_CHECK_CODE(code, lino, _exit); - if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) { - smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } - if ((code = tsdbCommit(VND_RSMA2(pVnode), pInfo)) < 0) { - smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } _exit: - terrno = code; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index a272f5fc97883943e95d8ee7feb1c60ec90d49cb..e613b5833fa76dbfe2adc03e0a57d32261ec6156 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -243,10 +243,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS return TSDB_CODE_FAILED; } - if (!(RSMA_FS(pRSmaStat)->aQTaskInf = taosArrayInit(1, sizeof(SQTaskFile)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } + taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat)); } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 8db36be741f3187834d4b2f2eca4fa302ca524fe..b7c2538b416ac59e7173c7ad7e8b0df9c59f9234 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -17,250 +17,713 @@ // ================================================================================================= -static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); +// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); -static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2); -/** - * @brief Open RSma FS from qTaskInfo files - * - * @param pSma - * @param version - * @return int32_t - */ -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version) { - SVnode *pVnode = pSma->pVnode; - int64_t commitID = pVnode->state.commitID; - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = NULL; - SArray *output = NULL; - terrno = TSDB_CODE_SUCCESS; +static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { + int32_t n = 0; - if (!pEnv) { - return TSDB_CODE_SUCCESS; - } + n += tPutI8(p ? p + n : p, pFile->level); + n += tPutI64v(p ? p + n : p, pFile->size); + n += tPutI64v(p ? p + n : p, pFile->suid); + n += tPutI64v(p ? p + n : p, pFile->version); + n += tPutI64v(p ? p + n : p, pFile->mtime); + + return n; +} + +static int32_t tdRSmaFSToBinary(uint8_t *p, SRSmaFS *pFS) { + int32_t n = 0; + uint32_t size = taosArrayGetSize(pFS->aQTaskInf); - if (tdFetchQTaskInfoFiles(pSma, version, &output) < 0) { - goto _end; + // version + n += tPutI8(p ? p + n : p, 0); + + // SArray + n += tPutU32v(p ? p + n : p, size); + for (uint32_t i = 0; i < size; ++i) { + n += tPutQTaskF(p ? p + n : p, taosArrayGet(pFS->aQTaskInf, i)); } - pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + return n; +} + +int32_t tdRSmaGetQTaskF(uint8_t *p, SQTaskFile *pFile) { + int32_t n = 0; + + n += tGetI8(p + n, &pFile->level); + n += tGetI64v(p + n, &pFile->size); + n += tGetI64v(p + n, &pFile->suid); + n += tGetI64v(p + n, &pFile->version); + n += tGetI64v(p + n, &pFile->mtime); + + return n; +} + +static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { + int32_t code = 0; + int32_t n = 0; + int8_t version = 0; + + // version + n += tGetI8(pData + n, &version); + + // SArray + taosArrayClear(pFS->aQTaskInf); + uint32_t size = 0; + n += tGetU32v(pData + n, &size); + for (uint32_t i = 0; i < size; ++i) { + SQTaskFile qTaskF = {0}; + + int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF); + if (nt < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } - for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { - int32_t vid = 0; - int64_t version = -1; - sscanf((const char *)taosArrayGetP(output, i), "v%dqinf.v%" PRIi64, &vid, &version); - SQTaskFile qTaskFile = {.version = version, .nRef = 1}; - if ((terrno = tdRSmaFSUpsertQTaskFile(RSMA_FS(pStat), &qTaskFile)) < 0) { - goto _end; + n += nt; + if (taosArrayPush(pFS->aQTaskInf, &qTaskF) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } - smaInfo("vgId:%d, open fs, version:%" PRIi64 ", ref:%d", TD_VID(pVnode), qTaskFile.version, qTaskFile.nRef); } -_end: - for (int32_t i = 0; i < taosArrayGetSize(output); ++i) { - void *ptr = taosArrayGetP(output, i); - taosMemoryFreeClear(ptr); + ASSERT(n + sizeof(TSCKSUM) == nData); + +_exit: + return code; +} + +static int32_t tdRSmaSaveFSToFile(SRSmaFS *pFS, const char *fname) { + int32_t code = 0; + int32_t lino = 0; + + // encode to binary + int32_t size = tdRSmaFSToBinary(NULL, pFS) + sizeof(TSCKSUM); + uint8_t *pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + tdRSmaFSToBinary(pData, pFS); + taosCalcChecksumAppend(0, pData, size); + + // save to file + TdFilePtr pFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t n = taosWriteFile(pFD, pData, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } - taosArrayDestroy(output); - if (terrno != TSDB_CODE_SUCCESS) { - smaError("vgId:%d, open rsma fs failed since %s", TD_VID(pVnode), terrstr()); - return TSDB_CODE_FAILED; + if (taosFsyncFile(pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } - return TSDB_CODE_SUCCESS; + + taosCloseFile(&pFD); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; } -void tdRSmaFSClose(SRSmaFS *fs) { taosArrayDestroy(fs->aQTaskInf); } +static int32_t tdRSmaFSCreate(SRSmaFS *pFS, int32_t size) { + int32_t code = 0; + + pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile)); + if (pFS->aQTaskInf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + +_exit: + return code; +} + +static void tdRSmaGetCurrentFName(SSma *pSma, char *current, char *current_t) { + SVnode *pVnode = pSma->pVnode; + if (pVnode->pTfs) { + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%svnode%svnode%d%srsma%sPRESENT.t", tfsGetPrimaryPath(pVnode->pTfs), + TD_DIRSEP, TD_DIRSEP, TD_VID(pVnode), TD_DIRSEP, TD_DIRSEP); + } + } else { +#if 0 + if (current) { + snprintf(current, TSDB_FILENAME_LEN - 1, "%s%sPRESENT", pTsdb->path, TD_DIRSEP); + } + if (current_t) { + snprintf(current_t, TSDB_FILENAME_LEN - 1, "%s%sPRESENT.t", pTsdb->path, TD_DIRSEP); + } +#endif + } +} + +static int32_t tdRSmaLoadFSFromFile(const char *fname, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + uint8_t *pData = NULL; + + // load binary + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size; + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosReadFile(pFD, pData, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!taosCheckChecksumWhole(pData, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + + // decode binary + code = tsdbBinaryToFS(pData, size, pFS); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + smaError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; +} static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { - if (*(int64_t *)p1 < ((SQTaskFile *)p2)->version) { + const SQTaskFile *q1 = (const SQTaskFile *)p1; + const SQTaskFile *q2 = (const SQTaskFile *)p2; + + if (q1->suid < q2->suid) { + return -1; + } else if (q1->suid > q2->suid) { + return 1; + } + + if (q1->level < q2->level) { return -1; - } else if (*(int64_t *)p1 > ((SQTaskFile *)p2)->version) { + } else if (q1->level > q2->level) { + return 1; + } + + if (q1->version < q2->version) { + return -2; + } else if (q1->version > q2->version) { return 1; } + return 0; } -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - SQTaskFile *pTaskF = NULL; - int32_t oldVal = 0; +static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFSNew) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *pFSOld = RSMA_FS(pStat); + int64_t version = pStat->commitAppliedVer; + char fname[TSDB_FILENAME_LEN] = {0}; + + // SQTaskFile + int32_t nNew = taosArrayGetSize(pFSNew->aQTaskInf); + int32_t iNew = 0; + while (iNew < nNew) { + SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFSNew->aQTaskInf, iNew++); + + int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE); + + if (idx < 0) { + idx = taosArrayGetSize(pFSOld->aQTaskInf); + pQTaskFNew->nRef = 1; + } else { + SQTaskFile *pTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); + int32_t c1 = tdQTaskInfCmprFn1(pQTaskFNew, pTaskF); + if (c1 == 0) { + // utilize the item in pFSOld->qQTaskInf, instead of pFSNew + continue; + } else if (c1 < 0) { + // NOTHING TODO + } else { + code = TSDB_CODE_RSMA_FS_UPDATE; + TSDB_CHECK_CODE(code, lino, _exit); + } + } - taosRLockLatch(RSMA_FS_LOCK(pStat)); - if ((pTaskF = taosArraySearch(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ))) { - oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); - ASSERT(oldVal > 0); + if (taosArrayInsert(pFSOld->aQTaskInf, idx, pQTaskFNew) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + // remove previous version + while (--idx >= 0) { + SQTaskFile *preTaskF = TARRAY_GET_ELEM(pFSOld->aQTaskInf, idx); + int32_t c2 = tdQTaskInfCmprFn1(preTaskF, pQTaskFNew); + if (c2 == 0) { + code = TSDB_CODE_RSMA_FS_UPDATE; + TSDB_CHECK_CODE(code, lino, _exit); + } else if (c2 != -2) { + break; + } + + nRef = atomic_sub_fetch_32(&preTaskF->nRef, 1); + if (nRef <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), preTaskF->suid, preTaskF->level, preTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + (void)taosRemoveFile(fname); + taosArrayRemove(pFSOld->aQTaskInf, idx); + } + } } - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return oldVal; + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { + int32_t code = 0; +#if 0 + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *pFS = RSMA_FS(pStat); + char fname[TSDB_FILENAME_LEN] = {0}; + char fnameVer[TSDB_FILENAME_LEN] = {0}; + + // SArray + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); + + // main.tdb ========= + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fnameVer); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); + + if (taosCheckExistFile(fnameVer)) { + if (taosRenameFile(fnameVer, fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, %s:%d succeed to to rename %s to %s", TD_VID(pVnode), __func__, lino, fnameVer, fname); + } else if (taosCheckExistFile(fname)) { + if (taosRemoveFile(fname) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, %s:%d succeed to to remove %s", TD_VID(pVnode), __func__, lino, fname); + } + } + + { + // remove those invalid files (todo) + // main.tdb-journal.5 // TDB should handle its clear for kill -9 + } + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } +#endif + return code; +} + +// EXPOSED APIS ==================================================================================== + +int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + + // open handle + code = tdRSmaFSCreate(RSMA_FS(pStat), 0); + TSDB_CHECK_CODE(code, lino, _exit); + + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, current, current_t); + + if (taosCheckExistFile(current)) { + code = tdRSmaLoadFSFromFile(current, RSMA_FS(pStat)); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tdRSmaFSRollback(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } else { + // 1st time open with empty current/qTaskInfoFile + code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // scan and try fix(remove main.db/main.db.xxx and use the one with version) + code = tdRSmaFSScanAndTryFix(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +void tdRSmaFSClose(SRSmaFS *pFS) { pFS->aQTaskInf = taosArrayDestroy(pFS->aQTaskInf); } + +int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew) { + int32_t code = 0; + int32_t lino = 0; + char tfname[TSDB_FILENAME_LEN]; + + tdRSmaGetCurrentFName(pSma, NULL, tfname); + + // generate PRESENT.t + code = tdRSmaSaveFSToFile(pFSNew, tfname); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } + return code; } -int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - int64_t version = -1; +int32_t tdRSmaFSCommit(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + SRSmaFS fs = {0}; + + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, current, current_t); + + if (!taosCheckExistFile(current_t)) { + goto _exit; + } + + // rename the file + if (taosRenameFile(current_t, current) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // load the new FS + code = tdRSmaFSCreate(&fs, 1); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tdRSmaLoadFSFromFile(current, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // apply file change + code = tdRSmaFSApplyChange(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + tdRSmaFSClose(&fs); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } + return code; +} + +int32_t tdRSmaFSFinishCommit(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + + taosWLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma)); + } + return code; +} + +int32_t tdRSmaFSRollback(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + + char current_t[TSDB_FILENAME_LEN] = {0}; + tdRSmaGetCurrentFName(pSma, NULL, current_t); + (void)taosRemoveFile(current_t); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(errno)); + } + return code; +} + +int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize) { + int32_t code = 0; + + for (int32_t i = 0; i < nSize; ++i) { + SQTaskFile *qTaskF = qTaskFile + i; + + int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskF, tdQTaskInfCmprFn1, TD_GE); + + if (idx < 0) { + idx = taosArrayGetSize(pFS->aQTaskInf); + } else { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); + int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); + if (c == 0) { + if (pTaskF->size != qTaskF->size) { + code = TSDB_CODE_RSMA_FS_UPDATE; + smaError("vgId:%d, %s failed at line %d since %s, level:%" PRIi8 ", suid:%" PRIi64 ", version:%" PRIi64 + ", size:%" PRIi64 " != %" PRIi64, + SMA_VID(pSma), __func__, __LINE__, tstrerror(code), pTaskF->level, pTaskF->suid, pTaskF->version, + pTaskF->size, qTaskF->size); + goto _exit; + } + continue; + } + } + + if (!taosArrayInsert(pFS->aQTaskInf, idx, qTaskF)) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + +_exit: + return code; +} +#if 0 +int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { + SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; + SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; + SQTaskFile *pTaskF = NULL; + int32_t oldVal = 0; taosRLockLatch(RSMA_FS_LOCK(pStat)); - if (taosArrayGetSize(aQTaskInf) > 0) { - version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version; + if (suid > 0 && level > 0) { + ASSERT(version > 0); + if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) { + oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); + ASSERT(oldVal > 0); + } + } else { + // ref all + int32_t size = taosArrayGetSize(aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); + oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); + ASSERT(oldVal > 0); + } } taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return version; + return oldVal; } -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) { +void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { SVnode *pVnode = pSma->pVnode; SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; char qTaskFullName[TSDB_FILENAME_LEN]; + SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; SQTaskFile *pTaskF = NULL; int32_t idx = -1; taosWLockLatch(RSMA_FS_LOCK(pStat)); - if ((idx = taosArraySearchIdx(aQTaskInf, &version, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { - ASSERT(idx < taosArrayGetSize(aQTaskInf)); - pTaskF = taosArrayGet(aQTaskInf, idx); - if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); - if (taosRemoveFile(qTaskFullName) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + if (suid > 0 && level > 0) { + ASSERT(version > 0); + if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { + ASSERT(idx < taosArrayGetSize(aQTaskInf)); + pTaskF = taosArrayGet(aQTaskInf, idx); + if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); + if (taosRemoveFile(qTaskFullName) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + } + taosArrayRemove(aQTaskInf, idx); } - taosArrayRemove(aQTaskInf, idx); + } + } else { + for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) { + pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); + int32_t nRef = INT32_MAX; + if (pTaskF->version == version) { + nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); + } else if (pTaskF->version < version) { + nRef = atomic_load_32(&pTaskF->nRef); + } + if (nRef <= 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); + if (taosRemoveFile(qTaskFullName) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, + tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); + } + taosArrayRemove(aQTaskInf, i); + continue; + } + ++i; } } + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); } +#endif -/** - * @brief Fetch qtaskfiles LE than version - * - * @param pSma - * @param version - * @param output - * @return int32_t - */ -static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) { - SVnode *pVnode = pSma->pVnode; - TdDirPtr pDir = NULL; - TdDirEntryPtr pDirEntry = NULL; - char dir[TSDB_FILENAME_LEN]; - const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$"; - regex_t regex; - int code = 0; - - terrno = TSDB_CODE_SUCCESS; - - tdGetVndDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir); - - if (!taosCheckExistFile(dir)) { - smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir); - return TSDB_CODE_SUCCESS; - } - - // Resource allocation and init - if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) { - terrno = TSDB_CODE_RSMA_REGEX_MATCH; - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf); - return TSDB_CODE_FAILED; - } - - if (!(pDir = taosOpenDir(dir))) { - regfree(®ex); - terrno = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); - return TSDB_CODE_FAILED; - } - - int32_t dirLen = strlen(dir); - char *dirEnd = POINTER_SHIFT(dir, dirLen); - regmatch_t regMatch[2]; - while ((pDirEntry = taosReadDir(pDir))) { - char *entryName = taosGetDirEntryName(pDirEntry); - if (!entryName) { - continue; +int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *qFS = RSMA_FS(pStat); + int32_t size = taosArrayGetSize(qFS->aQTaskInf); + + pFS->aQTaskInf = taosArrayInit(size, sizeof(SQTaskFile)); + if (pFS->aQTaskInf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *qTaskF = (SQTaskFile *)taosArrayGet(qFS->aQTaskInf, i); + nRef = atomic_fetch_add_32(&qTaskF->nRef, 1); + if (nRef <= 0) { + code = TSDB_CODE_RSMA_FS_REF; + TSDB_CHECK_CODE(code, lino, _exit); } + } - code = regexec(®ex, entryName, 2, regMatch, 0); + taosArraySetSize(pFS->aQTaskInf, size); + memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); - if (code == 0) { - // match - smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName); +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, nRef %d", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code), + nRef); + } + return code; +} - int64_t ver = -1; - sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver); - if ((ver <= version) && (ver > -1)) { - if (!(*output)) { - if (!(*output = taosArrayInit(1, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - } - char *entryDup = strdup(entryName); - if (!entryDup) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - if (!taosArrayPush(*output, &entryDup)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } +void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS) { + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN]; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + + for (int32_t i = 0; i < size; ++i) { + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, i); + + nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); + if (nRef == 0) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + if (taosRemoveFile(fname) < 0) { + smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), fname, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { + smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), fname); } - } else if (code == REG_NOMATCH) { - // not match - smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName); - continue; - } else { - // has other error - char errbuf[128]; - regerror(code, ®ex, errbuf, sizeof(errbuf)); - smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf); - terrno = TSDB_CODE_RSMA_REGEX_MATCH; - goto _end; + } else if (nRef < 0) { + smaWarn("vgId:%d, abnormal unref %s since %s", TD_VID(pVnode), fname, tstrerror(TSDB_CODE_RSMA_FS_REF)); } } -_end: - taosCloseDir(&pDir); - regfree(®ex); - return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; + + taosArrayDestroy(pFS->aQTaskInf); } -static int32_t tdQTaskFileCmprFn2(const void *p1, const void *p2) { - if (((SQTaskFile *)p1)->version < ((SQTaskFile *)p2)->version) { - return -1; - } else if (((SQTaskFile *)p1)->version > ((SQTaskFile *)p2)->version) { - return 1; - } +int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - return 0; + taosRLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSRef(pSma, pFS); + TSDB_CHECK_CODE(code, lino, _exit); +_exit: + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } + return code; } -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile) { - int32_t code = 0; - int32_t idx = taosArraySearchIdx(pFS->aQTaskInf, qTaskFile, tdQTaskFileCmprFn2, TD_GE); +int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); + SRSmaFS *qFS = RSMA_FS(pStat); + int32_t size = taosArrayGetSize(qFS->aQTaskInf); - if (idx < 0) { - idx = taosArrayGetSize(pFS->aQTaskInf); - } else { - SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); - int32_t c = tdQTaskFileCmprFn2(pTaskF, qTaskFile); - if (c == 0) { - pTaskF->nRef = qTaskFile->nRef; - pTaskF->version = qTaskFile->version; - pTaskF->size = qTaskFile->size; - goto _exit; - } - } + code = tdRSmaFSCreate(pFS, size); + TSDB_CHECK_CODE(code, lino, _exit); - if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskFile) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } + taosArraySetSize(pFS->aQTaskInf, size); + memcpy(pFS->aQTaskInf->pData, qFS->aQTaskInf->pData, size * sizeof(SQTaskFile)); _exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); + } return code; -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 2a769b68fe0ff93caa48a6a22030ca02b2af7023..3923a5dd5b37c3787ba64543b15071c4b73bba43 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -121,8 +121,6 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty int32_t smaOpen(SVnode *pVnode, int8_t rollback) { STsdbCfg *pCfg = &pVnode->config.tsdbCfg; - ASSERT(!pVnode->pSma); - SSma *pSma = taosMemoryCalloc(1, sizeof(SSma)); if (!pSma) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -137,7 +135,7 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { if (VND_IS_RSMA(pVnode)) { STsdbKeepCfg keepCfg = {0}; - for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { + for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) { if (i == TSDB_RETENTION_L0) { SMA_OPEN_RSMA_IMPL(pVnode, 0); } else if (i == TSDB_RETENTION_L1) { @@ -145,12 +143,14 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { } else if (i == TSDB_RETENTION_L2) { SMA_OPEN_RSMA_IMPL(pVnode, 2); } else { - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i); + goto _err; } } // restore the rsma - if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed) < 0) { + if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) { goto _err; } } @@ -181,8 +181,11 @@ int32_t smaClose(SSma *pSma) { * @param committedVer * @return int32_t */ -int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer) { - ASSERT(VND_IS_RSMA(pSma->pVnode)); +int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback) { + if (!VND_IS_RSMA(pSma->pVnode)) { + terrno = TSDB_CODE_RSMA_INVALID_ENV; + return TSDB_CODE_FAILED; + } - return tdRSmaProcessRestoreImpl(pSma, type, committedVer); + return tdRSmaProcessRestoreImpl(pSma, type, committedVer, rollback); } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ba99e5515d8b0b15892da433c80305477ab98dd4..044e1d5bab36361455fa3df6f52573e43baee339 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,17 +21,17 @@ #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) + SSmaMgmt smaMgmt = { .inited = 0, .rsetId = -1, }; -#define TD_QTASKINFO_FNAME_PREFIX "qinf.v" - typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; -typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); +static void tdUidStoreDestory(STbUidStore *pStore); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); @@ -57,38 +57,6 @@ struct SRSmaQTaskInfoItem { void *qTaskInfo; }; -struct SRSmaQTaskInfoIter { - STFile *pTFile; - int64_t offset; - int64_t fsize; - int32_t nBytes; - int32_t nAlloc; - char *pBuf; - // ------------ - char *qBuf; // for iterator - int32_t nBufPos; -}; - -void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { - tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); -} - -void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char *path, char *outputName) { - tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); -} - -void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { - tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); - int32_t rsmaLen = strlen(outputName); - snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); -} - -void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { - tdGetVndDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); - int32_t rsmaLen = strlen(outputName); - snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi64 "%s%" PRIi8, suid, TD_DIRSEP, level); -} - static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC if (!taskHandle || !(*taskHandle)) return; @@ -363,10 +331,12 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_SUCCESS; } +#if 0 if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; return TSDB_CODE_FAILED; } +#endif SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); @@ -374,13 +344,8 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo) { - // TODO: free original pRSmaInfo if exists abnormally - tdFreeRSmaInfo(pSma, *(SRSmaInfo **)pRSmaInfo, true); - if (taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)) < 0) { - terrno = TSDB_CODE_RSMA_REMOVE_EXISTS; - goto _err; - } - smaWarn("vgId:%d, remove the rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); + smaInfo("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); + return TSDB_CODE_SUCCESS; } // from write queue: single thead @@ -449,8 +414,8 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SVCreateStbReq *pReq) { } if (!VND_IS_RSMA(pVnode)) { - smaTrace("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, - pReq->suid); + smaWarn("vgId:%d, not create rsma for stable %s %" PRIi64 " since vnd is not rsma", TD_VID(pVnode), pReq->name, + pReq->suid); return TSDB_CODE_SUCCESS; } @@ -494,9 +459,8 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { tdReleaseRSmaInfo(pSma, pRSmaInfo); - // save to file - // TODO - smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); + // no need to save to file as triggered by dropping stable + smaDebug("vgId:%d, drop rsma for stable %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); return TSDB_CODE_SUCCESS; } @@ -561,7 +525,7 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) return TSDB_CODE_SUCCESS; } -void tdUidStoreDestory(STbUidStore *pStore) { +static void tdUidStoreDestory(STbUidStore *pStore) { if (pStore) { if (pStore->uidHash) { if (pStore->tbUids) { @@ -602,7 +566,7 @@ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { } SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; - // TODO: spin lock for race conditiond + // TODO: spin lock for race condition if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { return TSDB_CODE_FAILED; } @@ -708,8 +672,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid, output->info.id.groupId, - output->info.rows); + smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid, + output->info.id.groupId, output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq *pReq = NULL; @@ -840,12 +804,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_SUCCESS; } if (!pInfo->pTSchema) { + terrno = TSDB_CODE_INVALID_PTR; smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); return TSDB_CODE_FAILED; } - smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, - RSMA_INFO_QTASK(pInfo, idx), pInfo->suid); + smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64 " nMsg:%d", SMA_VID(pSma), level, + RSMA_INFO_QTASK(pInfo, idx), pInfo->suid, msgSize); #if 0 for (int32_t i = 0; i < msgSize; ++i) { @@ -854,7 +819,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, tdRsmaPrintSubmitReq(pSma, pReq); } #endif - if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { + if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -871,6 +836,12 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t char *pOutput = NULL; int32_t len = 0; + if (!srcTaskInfo) { + terrno = TSDB_CODE_INVALID_PTR; + smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid); + return TSDB_CODE_FAILED; + } + if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, terrstr()); @@ -1051,12 +1022,8 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } + STbUidStore uidStore = {0}; - SRetention *pRetention = SMA_RETENTION(pSma); - if (!RETENTION_VALID(pRetention + 1)) { - // return directly if retention level 1 is invalid - return TSDB_CODE_SUCCESS; - } if (inputType == STREAM_INPUT__DATA_SUBMIT) { if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { @@ -1186,18 +1153,21 @@ _err: } /** - * @brief reload ts data from checkpoint - * - * @param pSma - * @return int32_t + * N.B. the data would be restored from the unified WAL replay procedure */ -static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { - // NOTHING TODO: the data would be restored from the unified WAL replay procedure - return TSDB_CODE_SUCCESS; -} +int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback) { + // step 1: init env + if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + return TSDB_CODE_FAILED; + } -int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) { - // step 1: iterate all stables to restore the rsma env + // step 2: open SRSmaFS for qTaskFiles + if (tdRSmaFSOpen(pSma, qtaskFileVer, rollback) < 0) { + goto _err; + } + + // step 3: iterate all stables to restore the rsma env int64_t nTables = 0; if (tdRSmaRestoreQTaskInfoInit(pSma, &nTables) < 0) { goto _err; @@ -1207,16 +1177,6 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer) return TSDB_CODE_SUCCESS; } - // step 2: reload ts data from checkpoint - if (tdRSmaRestoreTSDataReload(pSma) < 0) { - goto _err; - } - - // step 3: open SRSmaFS for qTaskFiles - if (tdRSmaFSOpen(pSma, qtaskFileVer) < 0) { - goto _err; - } - smaInfo("vgId:%d, restore rsma task %" PRIi8 " from qtaskf %" PRIi64 " succeed", SMA_VID(pSma), type, qtaskFileVer); return TSDB_CODE_SUCCESS; _err: @@ -1226,19 +1186,26 @@ _err: } int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { - SSma *pSma = pRSmaStat->pSma; - SVnode *pVnode = pSma->pVnode; - int32_t vid = SMA_VID(pSma); + int32_t code = 0; + int32_t lino = 0; + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + SArray *qTaskFArray = NULL; + int64_t version = pRSmaStat->commitAppliedVer; + TdFilePtr pOutFD = NULL; + TdFilePtr pInFD = NULL; + char fname[TSDB_FILENAME_LEN]; + char fnameVer[TSDB_FILENAME_LEN]; + SRSmaFS fs = {0}; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } - int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat); - if (pRSmaStat->commitAppliedVer <= fsMaxVer) { - smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid, - pRSmaStat->commitAppliedVer, fsMaxVer); - return TSDB_CODE_SUCCESS; + qTaskFArray = taosArrayInit(taosHashGetSize(pInfoHash) << 1, sizeof(SQTaskFile)); + if (!qTaskFArray) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } void *infoHash = NULL; @@ -1253,19 +1220,80 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); if (pItem && pItem->pStreamState) { if (streamStateCommit(pItem->pStreamState) < 0) { - terrno = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; - goto _err; + code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), + pRSmaInfo->suid, i + 1); + + // qTaskInfo file + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, -1, tfsGetPrimaryPath(pVnode->pTfs), fname); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pRSmaInfo->suid, i + 1, version, tfsGetPrimaryPath(pVnode->pTfs), + fnameVer); + if (taosCheckExistFile(fnameVer)) { + smaWarn("vgId:%d, rsma persist, duplicate file %s exist", TD_VID(pVnode), fnameVer); + } + + pOutFD = taosCreateFile(fnameVer, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + pInFD = taosOpenFile(fname, TD_FILE_READ); + if (pInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size = 0; + uint32_t mtime = 0; + if (taosFStatFile(pInFD, &size, &mtime) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + ASSERT(size > 0); + + int64_t offset = 0; + if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, rsma persist, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), fname, + fnameVer, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); } - smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 " level %d", vid, pRSmaInfo->suid, - i + 1); + taosCloseFile(&pOutFD); + taosCloseFile(&pInFD); + + SQTaskFile qTaskF = { + .nRef = 1, .level = i + 1, .suid = pRSmaInfo->suid, .version = version, .size = size, .mtime = mtime}; + + taosArrayPush(qTaskFArray, &qTaskF); } } } - return TSDB_CODE_SUCCESS; -_err: - smaError("vgId:%d, rsma persist failed since %s", vid, terrstr()); - return TSDB_CODE_FAILED; + // prepare + code = tdRSmaFSCopy(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tdRSmaFSUpsertQTaskFile(pSma, &fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tdRSmaFSPrepareCommit(pSma, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + + taosArrayDestroy(fs.aQTaskInf); + taosArrayDestroy(qTaskFArray); + + if (code) { + if (pOutFD) taosCloseFile(&pOutFD); + if (pInFD) taosCloseFile(&pInFD); + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + + terrno = code; + return code; } /** @@ -1352,12 +1380,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tsem_post(&(pStat->notEmpty)); } } break; - case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); - } break; case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive", + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is inactive ", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_INIT: { @@ -1365,8 +1389,9 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat:%" PRIi8 + " is unknown", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid, fetchTriggerStat); } break; } @@ -1448,7 +1473,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA taosGetQitem(qall, (void **)&msg); if (msg) { if (!taosArrayPush(pSubmitArr, &msg)) { - tdFreeRSmaSubmitItems(pSubmitArr); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } else { @@ -1460,7 +1485,6 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA if (size > 0) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); goto _err; } } @@ -1468,6 +1492,9 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA } return TSDB_CODE_SUCCESS; _err: + smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, + type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); + tdFreeRSmaSubmitItems(pSubmitArr); while (1) { void *msg = NULL; taosGetQitem(qall, (void **)&msg); @@ -1514,8 +1541,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { while ((pIter = taosHashIterate(infoHash, pIter))) { SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { - if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || - RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) { int32_t batchCnt = -1; int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads; bool occupied = (batchMax <= 1); @@ -1531,13 +1557,20 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type); } - if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + if (RSMA_NEED_FETCH(pInfo)) { int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); if (oldStat == 0 || ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); ASSERT(oldVal >= 0); - tdRSmaFetchAllResult(pSma, pInfo); + + int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)); + if (curStat == 1) { + smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat); + } else { + tdRSmaFetchAllResult(pSma, pInfo); + } + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); } @@ -1547,17 +1580,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (qallItemSize > 0) { atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue; - } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { - continue; - } - for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) { - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j); - if (pItem->fetchLevel) { - pItem->fetchLevel = 0; - taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } - } + } + if (RSMA_NEED_FETCH(pInfo)) { + continue; } break; diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 34f884f9f95776be10f1416fdf50f99a8fafce6e..c68525a493b83a8baadeb86c8133c00f2dad09a0 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -17,14 +17,13 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version); -static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader); // SRSmaSnapReader ======================================== struct SRSmaSnapReader { SSma* pSma; int64_t sver; int64_t ever; + SRSmaFS fs; // for data file int8_t rsmaDataDone[TSDB_RETENTION_L2]; @@ -32,19 +31,23 @@ struct SRSmaSnapReader { // for qtaskinfo file int8_t qTaskDone; + int32_t fsIter; SQTaskFReader* pQTaskFReader; }; int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapReader** ppReader) { int32_t code = 0; + int32_t lino = 0; SVnode* pVnode = pSma->pVnode; SRSmaSnapReader* pReader = NULL; + SSmaEnv* pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat* pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); // alloc pReader = (SRSmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pReader->pSma = pSma; pReader->sver = sver; @@ -55,171 +58,147 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead if (pSma->pRSmaTsdb[i]) { code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2, &pReader->pDataReader[i]); - if (code < 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); } } // open qtaskinfo - if ((code = rsmaQTaskInfSnapReaderOpen(pReader, ever)) < 0) { - goto _err; + taosRLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSRef(pSma, &pReader->fs); + taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) { + pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); + if (!pReader->pQTaskFReader) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + pReader->pQTaskFReader->pSma = pSma; + pReader->pQTaskFReader->version = pReader->ever; } *ppReader = pReader; - - return TSDB_CODE_SUCCESS; -_err: - if (pReader) rsmaSnapReaderClose(&pReader); - *ppReader = NULL; - smaError("vgId:%d, vnode snapshot rsma reader open failed since %s", TD_VID(pVnode), tstrerror(code)); - return TSDB_CODE_FAILED; -} - -static int32_t rsmaQTaskInfSnapReaderOpen(SRSmaSnapReader* pReader, int64_t version) { - int32_t code = 0; - SSma* pSma = pReader->pSma; - SVnode* pVnode = pSma->pVnode; - SSmaEnv* pEnv = NULL; - SRSmaStat* pStat = NULL; - - if (!(pEnv = SMA_RSMA_ENV(pSma))) { - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as env is NULL", - TD_VID(pVnode), version); - return TSDB_CODE_SUCCESS; - } - - pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); - - int32_t ref = tdRSmaFSRef(pReader->pSma, pStat, version); - if (ref < 1) { - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as ref is %d", - TD_VID(pVnode), version, ref); - return TSDB_CODE_SUCCESS; - } - - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - - if (!taosCheckExistFile(qTaskInfoFullName)) { - tdRSmaFSUnRef(pSma, pStat, version); - smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo version %" PRIi64 " not need as %s not exist", - TD_VID(pVnode), version, qTaskInfoFullName); - return TSDB_CODE_SUCCESS; - } - - pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); - if (!pReader->pQTaskFReader) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - TdFilePtr fp = taosOpenFile(qTaskInfoFullName, TD_FILE_READ); - if (!fp) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFreeClear(pReader->pQTaskFReader); - goto _end; - } - - pReader->pQTaskFReader->pReadH = fp; - pReader->pQTaskFReader->pSma = pSma; - pReader->pQTaskFReader->version = pReader->ever; - -_end: - if (code < 0) { - tdRSmaFSUnRef(pSma, pStat, version); - smaError("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName); - return TSDB_CODE_FAILED; - } - - smaInfo("vgId:%d, vnode snapshot rsma reader open %s succeed", TD_VID(pVnode), qTaskInfoFullName); - return TSDB_CODE_SUCCESS; -} - -static int32_t rsmaQTaskInfSnapReaderClose(SQTaskFReader** ppReader) { - if (!(*ppReader)) { - return TSDB_CODE_SUCCESS; +_exit: + if (code) { + if (pReader) rsmaSnapReaderClose(&pReader); + *ppReader = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } - - SSma* pSma = (*ppReader)->pSma; - SRSmaStat* pStat = SMA_RSMA_STAT(pSma); - int64_t version = (*ppReader)->version; - - taosCloseFile(&(*ppReader)->pReadH); - tdRSmaFSUnRef(pSma, pStat, version); - taosMemoryFreeClear(*ppReader); - smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo version %" PRIi64, SMA_VID(pSma), version); - - return TSDB_CODE_SUCCESS; + return code; } static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf) { int32_t code = 0; - SSma* pSma = pReader->pSma; + int32_t lino = 0; + SVnode* pVnode = pReader->pSma->pVnode; + SQTaskFReader* qReader = pReader->pQTaskFReader; + SRSmaFS* pFS = &pReader->fs; int64_t n = 0; uint8_t* pBuf = NULL; - SQTaskFReader* qReader = pReader->pQTaskFReader; + int64_t version = pReader->ever; + char fname[TSDB_FILENAME_LEN]; if (!qReader) { *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", SMA_VID(pSma)); - return 0; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode)); + goto _exit; + } + + if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) { + *ppBuf = NULL; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode)); + goto _exit; + } + + while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) { + SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++); + if (qTaskF->version != version) { + continue; + } + + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs), + fname); + if (!taosCheckExistFile(fname)) { + smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 + ", version %" PRIi64 " failed since %s not exist", + TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname); + code = TSDB_CODE_RSMA_FS_SYNC; + TSDB_CHECK_CODE(code, lino, _exit); + } + + TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ); + if (!fp) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + qReader->pReadH = fp; + qReader->level = qTaskF->level; + qReader->suid = qTaskF->suid; } if (!qReader->pReadH) { *ppBuf = NULL; - smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is NULL", SMA_VID(pSma)); - return 0; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode)); + goto _exit; } int64_t size = 0; if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // seek if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(!(*ppBuf)); - // alloc - *ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size); + if (*ppBuf) { + *ppBuf = taosMemoryRealloc(*ppBuf, sizeof(SSnapDataHdr) + size); + } else { + *ppBuf = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + } if (!(*ppBuf)) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // read n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } else if (n != size) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size); + smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, version:%" PRIi64 ", size:%" PRIi64, TD_VID(pVnode), version, + size); SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); pHdr->type = SNAP_DATA_QTASK; + pHdr->flag = qReader->level; + pHdr->index = qReader->suid; pHdr->size = size; _exit: - smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma)); - return code; + if (qReader) taosCloseFile(&qReader->pReadH); -_err: - *ppBuf = NULL; - smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code)); + if (code) { + *ppBuf = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", TD_VID(pVnode)); + } return code; } int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; + int32_t lino = 0; *ppData = NULL; @@ -233,14 +212,11 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { if (!pReader->rsmaDataDone[i]) { smaInfo("vgId:%d, vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i); code = tsdbSnapRead(pTsdbSnapReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { - if (*ppData) { - goto _exit; - } else { - pReader->rsmaDataDone[i] = 1; - } + pReader->rsmaDataDone[i] = 1; } } else { smaInfo("vgId:%d, vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i); @@ -251,22 +227,21 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) { if (!pReader->qTaskDone) { smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma)); code = rsmaSnapReadQTaskInfo(pReader, ppData); - if (code) { - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); + if (*ppData) { + goto _exit; } else { pReader->qTaskDone = 1; - if (*ppData) { - goto _exit; - } } } _exit: - smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); - return code; - -_err: - smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); + if (code) { + rsmaSnapReaderClose(&pReader); + smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma)); + } return code; } @@ -274,14 +249,15 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) { int32_t code = 0; SRSmaSnapReader* pReader = *ppReader; + tdRSmaFSUnRef(pReader->pSma, &pReader->fs); + taosMemoryFreeClear(pReader->pQTaskFReader); + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pReader->pDataReader[i]) { tsdbSnapReaderClose(&pReader->pDataReader[i]); } } - rsmaQTaskInfSnapReaderClose(&pReader->pQTaskFReader); - smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma)); taosMemoryFreeClear(*ppReader); @@ -293,28 +269,23 @@ struct SRSmaSnapWriter { SSma* pSma; int64_t sver; int64_t ever; - - // config - int64_t commitID; + SRSmaFS fs; // for data file STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2]; - - // for qtaskinfo file - SQTaskFReader* pQTaskFReader; - SQTaskFWriter* pQTaskFWriter; }; int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter) { int32_t code = 0; - SRSmaSnapWriter* pWriter = NULL; + int32_t lino = 0; SVnode* pVnode = pSma->pVnode; + SRSmaSnapWriter* pWriter = NULL; // alloc pWriter = (SRSmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); - if (pWriter == NULL) { + if (!pWriter) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } pWriter->pSma = pSma; pWriter->sver = sver; @@ -324,100 +295,152 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]); - if (code < 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _exit); } } // qtaskinfo - SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter)); - qWriter->pSma = pSma; - - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (!qTaskF) { - taosMemoryFree(qWriter); - code = TAOS_SYSTEM_ERROR(errno); - smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, - tstrerror(code)); - goto _err; - } - qWriter->pWriteH = qTaskF; - int32_t fnameLen = strlen(qTaskInfoFullName) + 1; - qWriter->fname = taosMemoryCalloc(1, fnameLen); - strncpy(qWriter->fname, qTaskInfoFullName, fnameLen); - pWriter->pQTaskFWriter = qWriter; - smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName); + code = tdRSmaFSCopy(pSma, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); // snapWriter *ppWriter = pWriter; - - smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); +_exit: + if (code) { + if (pWriter) rsmaSnapWriterClose(&pWriter, 0); + *ppWriter = NULL; + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); + } return code; +} -_err: - smaError("vgId:%d, rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code)); - if (pWriter) rsmaSnapWriterClose(&pWriter, 0); - *ppWriter = NULL; +int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { + int32_t code = 0; + int32_t lino = 0; + + if (pWriter) { + code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); + } return code; } int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; + int32_t lino = 0; + SSma* pSma = NULL; + SVnode* pVnode = NULL; + SSmaEnv* pEnv = NULL; + SRSmaStat* pStat = NULL; SRSmaSnapWriter* pWriter = *ppWriter; - SVnode* pVnode = pWriter->pSma->pVnode; + const char* primaryPath = NULL; + char fname[TSDB_FILENAME_LEN] = {0}; + char fnameVer[TSDB_FILENAME_LEN] = {0}; + TdFilePtr pOutFD = NULL; + TdFilePtr pInFD = NULL; + + if (!pWriter) { + goto _exit; + } - if (rollback) { - // TODO: rsma1/rsma2 - // qtaskinfo - if (pWriter->pQTaskFWriter) { - if (taosRemoveFile(pWriter->pQTaskFWriter->fname) != 0) { - smaWarn("vgId:%d, vnode snapshot rsma writer failed to remove %s since %s", SMA_VID(pWriter->pSma), - pWriter->pQTaskFWriter->fname ? pWriter->pQTaskFWriter->fname : "NULL", - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } + pSma = pWriter->pSma; + pVnode = pSma->pVnode; + pEnv = SMA_RSMA_ENV(pSma); + pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv); + primaryPath = tfsGetPrimaryPath(pVnode->pTfs); + + // rsma1/rsma2 + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pWriter->pDataWriter[i]) { + code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); + TSDB_CHECK_CODE(code, lino, _exit); } + } + + // qtaskinfo + if (rollback) { + tdRSmaFSRollback(pSma); + // remove qTaskFiles } else { - // rsma1/rsma2 - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pWriter->pDataWriter[i]) { - code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); - if (code) goto _err; + // sendFile from fname.Ver to fname + SRSmaFS* pFS = &pWriter->fs; + int32_t size = taosArrayGetSize(pFS->aQTaskInf); + for (int32_t i = 0; i < size; ++i) { + SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i); + if (pTaskF->version == pWriter->ever) { + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer); + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname); + + pInFD = taosOpenFile(fnameVer, TD_FILE_READ); + if (pInFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + if (pOutFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size = 0; + if (taosFStatFile(pInFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t offset = 0; + if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode), + fnameVer, fname, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosCloseFile(&pOutFD); + taosCloseFile(&pInFD); } } - // qtaskinfo - if (pWriter->pQTaskFWriter) { - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pWriter->ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - if (taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma), - pWriter->pQTaskFWriter->fname, qTaskInfoFullName); - // rsma restore - if ((code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever)) < 0) { - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma writer restore from %s succeed", SMA_VID(pWriter->pSma), qTaskInfoFullName); + // lock + taosWLockLatch(RSMA_FS_LOCK(pStat)); + code = tdRSmaFSCommit(pSma); + if (code) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + goto _exit; } + // unlock + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); } - smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); - taosMemoryFree(pWriter); + // rsma restore + code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback); + TSDB_CHECK_CODE(code, lino, _exit); + smaInfo("vgId:%d, vnode snapshot rsma writer restore from sync succeed", SMA_VID(pSma)); + +_exit: + if (pWriter) taosMemoryFree(pWriter); *ppWriter = NULL; - return code; + if (code) { + if (pOutFD) taosCloseFile(&pOutFD); + if (pInFD) taosCloseFile(&pInFD); + smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code)); + } else { + smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma)); + } -_err: - smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); return code; } int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; + int32_t lino = 0; SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; // rsma1/rsma2 @@ -430,42 +453,81 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) } else if (pHdr->type == SNAP_DATA_QTASK) { code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); } else { - ASSERT(0); + code = TSDB_CODE_RSMA_FS_SYNC; } - if (code < 0) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); _exit: - smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); - return code; - -_err: - smaError("vgId:%d, rsma snapshot write for data type %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type, - tstrerror(code)); + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, data type %" PRIi8, SMA_VID(pWriter->pSma), __func__, lino, + tstrerror(code), pHdr->type); + } else { + smaInfo("vgId:%d, rsma snapshot write for data type %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type); + } return code; } static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - SQTaskFWriter* qWriter = pWriter->pQTaskFWriter; - - if (qWriter && qWriter->pWriteH) { - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - int64_t size = pHdr->size; - ASSERT(size == (nData - sizeof(SSnapDataHdr))); - int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size); - if (contLen != size) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname); + int32_t code = 0; + int32_t lino = 0; + SSma* pSma = pWriter->pSma; + SVnode* pVnode = pSma->pVnode; + char fname[TSDB_FILENAME_LEN]; + TdFilePtr fp = NULL; + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + + fname[0] = '\0'; + + if (pHdr->size != (nData - sizeof(SSnapDataHdr))) { + code = TSDB_CODE_RSMA_FS_SYNC; + TSDB_CHECK_CODE(code, lino, _exit); + } + + SQTaskFile qTaskFile = { + .nRef = 1, .level = pHdr->flag, .suid = pHdr->index, .version = pWriter->ever, .size = pHdr->size}; + + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pHdr->index, pHdr->flag, qTaskFile.version, + tfsGetPrimaryPath(pVnode->pTfs), fname); + + fp = taosCreateFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (!fp) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t contLen = taosWriteFile(fp, pHdr->data, pHdr->size); + if (contLen != pHdr->size) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + uint32_t mtime = 0; + if (taosFStatFile(fp, NULL, &mtime) != 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); } else { - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma)); + qTaskFile.mtime = mtime; } + if (taosFsyncFile(fp) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&fp); + + code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: - return code; + if (code) { + if (fp) { + (void)taosRemoveFile(fname); + } + smaError("vgId:%d, %s failed at line %d since %s, file:%s", TD_VID(pVnode), __func__, lino, tstrerror(code), fname); + } else { + smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", TD_VID(pVnode), fname); + } -_err: - smaError("vgId:%d, vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index a2c9484693a720243ccb205f0d311289c86f254e..9ce4a2077171b25ec3d98d970db99fee09aa1240 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -111,35 +111,48 @@ _err: * @return int32_t */ static int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { - SSmaCfg *pCfg = (SSmaCfg *)pMsg; + int32_t code = 0; + int32_t lino = 0; + SSmaCfg *pCfg = (SSmaCfg *)pMsg; + SName stbFullName = {0}; + SVCreateStbReq pReq = {0}; if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { // create tsma meta in dstVgId if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // create stable to save tsma result in dstVgId - SName stbFullName = {0}; tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - SVCreateStbReq pReq = {0}; pReq.name = (char *)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { - return -1; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } + } else { + code = terrno = TSDB_CODE_TSMA_INVALID_STAT; + TSDB_CHECK_CODE(code, lino, _exit); + } +_exit: + if (code) { + smaError("vgId:%d, failed at line %d to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 + " dstTb:%s dstVg:%d", + SMA_VID(pSma), lino, pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, + pCfg->dstVgId); + } else { smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); - } else { - ASSERT(0); } - return 0; + return code; } /** @@ -174,7 +187,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char STSmaStat *pTsmaStat = NULL; if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { - terrno = TSDB_CODE_TSMA_INVALID_STAT; + terrno = TSDB_CODE_TSMA_INVALID_ENV; return TSDB_CODE_FAILED; } @@ -216,9 +229,14 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char indexUid, tstrerror(terrno)); goto _err; } - + #if 0 - ASSERT(!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)); + if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, tsma insert for smaIndex %" PRIi64 " failed since %s, %s", SMA_VID(pSma), indexUid, + pTsmaStat->pTSma->indexUid, tstrerror(terrno), pTsmaStat->pTSma->dstTbName); + goto _err; + } #endif SRpcMsg submitReqMsg = { diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 4d09d690d684c68199f63165875216ea3685acad..7c538280e52ed127ff2815623a2afa00c126de7f 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -15,197 +15,72 @@ #include "sma.h" -// smaFileUtil ================ -#if 0 -#define TD_FILE_STATE_OK 0 -#define TD_FILE_STATE_BAD 1 +#define TD_QTASKINFO_FNAME_PREFIX "main.tdb" -#define TD_FILE_INIT_MAGIC 0xFFFFFFFF - -static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); -static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); - -static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { - int32_t tlen = 0; - - tlen += taosEncodeFixedU32(buf, pInfo->magic); - tlen += taosEncodeFixedU32(buf, pInfo->ftype); - tlen += taosEncodeFixedU32(buf, pInfo->fver); - tlen += taosEncodeFixedI64(buf, pInfo->fsize); - - return tlen; -} - -static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); - buf = taosDecodeFixedU32(buf, &(pInfo->fver)); - buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); - - return buf; -} - -int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); - if (nwrite < nbyte) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return nwrite; -} - -int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence); - if (loffset < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return loffset; -} - -int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { - ASSERT(TD_TFILE_OPENED(pTFile)); - return taosFStatFile(pTFile->pFile, size, NULL); -} - -int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); - if (nread < 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return nread; -} - -int32_t tdUpdateTFileHeader(STFile *pTFile) { - char buf[TD_FILE_HEAD_SIZE] = "\0"; - - if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { - return -1; - } - - void *ptr = buf; - tdEncodeTFInfo(&ptr, &(pTFile->info)); - - taosCalcChecksumAppend(0, (uint8_t *)buf, TD_FILE_HEAD_SIZE); - if (tdWriteTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { - return -1; - } - - return 0; -} - -int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { - char buf[TD_FILE_HEAD_SIZE] = "\0"; - uint32_t _version; - - ASSERT(TD_TFILE_OPENED(pTFile)); - - if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { - return -1; - } - - if (tdReadTFile(pTFile, buf, TD_FILE_HEAD_SIZE) < 0) { - return -1; - } - - if (!taosCheckChecksumWhole((uint8_t *)buf, TD_FILE_HEAD_SIZE)) { - terrno = TSDB_CODE_FILE_CORRUPTED; - return -1; - } - - void *pBuf = buf; - pBuf = tdDecodeTFInfo(pBuf, pInfo); - return 0; -} - -void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { - pTFile->info.magic = taosCalcChecksum(pTFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName) { + tdRSmaGetFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } -int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { - ASSERT(TD_TFILE_OPENED(pTFile)); - - int64_t toffset; - - if ((toffset = tdSeekTFile(pTFile, 0, SEEK_END)) < 0) { - return -1; - } - -#if 0 - smaDebug("append to file %s, offset:%" PRIi64 " nbyte:%" PRIi64 " fsize:%" PRIi64, TD_TFILE_FULL_NAME(pTFile), - toffset, nbyte, toffset + nbyte); -#endif - - ASSERT(pTFile->info.fsize == toffset); - - if (offset) { - *offset = toffset; - } - - if (tdWriteTFile(pTFile, buf, nbyte) < 0) { - return -1; - } - - pTFile->info.fsize += nbyte; - - return nbyte; +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, + char *outputName) { + tdRSmaGetFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, suid, level, version, outputName); } -int32_t tdOpenTFile(STFile *pTFile, int flags) { - ASSERT(!TD_TFILE_OPENED(pTFile)); - - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; +void tdRSmaQTaskInfoGetFullPath(int32_t vgId, int8_t level, const char *path, char *outputName) { + tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8, level); } -void tdCloseTFile(STFile *pTFile) { - if (TD_TFILE_OPENED(pTFile)) { - taosCloseFile(&pTFile->pFile); - TD_TFILE_SET_CLOSED(pTFile); - } +void tdRSmaQTaskInfoGetFullPathEx(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { + tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); + int32_t rsmaLen = strlen(outputName); + snprintf(outputName + rsmaLen, TSDB_FILENAME_LEN - rsmaLen, "%" PRIi8 "%s%" PRIi64, level, TD_DIRSEP, suid); } -void tdDestroyTFile(STFile *pTFile) { taosMemoryFreeClear(TD_TFILE_FULL_NAME(pTFile)); } - -#endif - -void tdGetVndFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t version, - char *outputName) { - if (version < 0) { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, - TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); +void tdRSmaGetFileName(int32_t vgId, const char *pdname, const char *dname, const char *fname, int64_t suid, + int8_t level, int64_t version, char *outputName) { + if (level >= 0 && suid > 0) { + if (version >= 0) { + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, pdname, + TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, + version); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s.%" PRIi64, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname, version); + } } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, - vgId, fname); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", pdname, + TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%s%" PRIi8 "%s%" PRIi64 "%s%s", TD_DIRSEP, vgId, + TD_DIRSEP, dname, TD_DIRSEP, level, TD_DIRSEP, suid, TD_DIRSEP, fname); + } } } else { - if (pdname) { - snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, - vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); + if (version >= 0) { + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s%" PRIi64, pdname, TD_DIRSEP, TD_DIRSEP, + vgId, TD_DIRSEP, dname, TD_DIRSEP, vgId, fname, version); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, + TD_DIRSEP, vgId, fname, version); + } } else { - snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s%" PRIi64, TD_DIRSEP, vgId, TD_DIRSEP, dname, - TD_DIRSEP, vgId, fname, version); + if (pdname) { + snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%sv%d%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, + TD_DIRSEP, dname, TD_DIRSEP, vgId, fname); + } else { + snprintf(outputName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s%sv%d%s", TD_DIRSEP, vgId, TD_DIRSEP, dname, + TD_DIRSEP, vgId, fname); + } } } } -void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { +void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName) { if (pdname) { if (endWithSep) { snprintf(outputName, TSDB_FILENAME_LEN, "%s%svnode%svnode%d%s%s%s", pdname, TD_DIRSEP, TD_DIRSEP, vgId, TD_DIRSEP, @@ -223,81 +98,13 @@ void tdGetVndDirName(int32_t vgId, const char *pdname, const char *dname, bool e } } -#if 0 -int32_t tdInitTFile(STFile *pTFile, const char *dname, const char *fname) { - TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK); - TD_TFILE_SET_CLOSED(pTFile); - - memset(&(pTFile->info), 0, sizeof(pTFile->info)); - pTFile->info.magic = TD_FILE_INIT_MAGIC; - - char tmpName[TSDB_FILENAME_LEN * 2 + 32] = {0}; - snprintf(tmpName, TSDB_FILENAME_LEN * 2 + 32, "%s%s%s", dname, TD_DIRSEP, fname); - int32_t tmpNameLen = strlen(tmpName) + 1; - pTFile->fname = taosMemoryMalloc(tmpNameLen); - if (!pTFile->fname) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - tstrncpy(pTFile->fname, tmpName, tmpNameLen); - - return 0; -} - -int32_t tdCreateTFile(STFile *pTFile, bool updateHeader, int8_t fType) { - ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - if (errno == ENOENT) { - // Try to create directory recursively - char *s = strdup(TD_TFILE_FULL_NAME(pTFile)); - if (taosMulMkDir(taosDirName(s)) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(s); - return -1; - } - taosMemoryFree(s); - pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pTFile->pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - } - } - - if (!updateHeader) { - return 0; - } - - pTFile->info.fsize += TD_FILE_HEAD_SIZE; - pTFile->info.fver = 0; - - if (tdUpdateTFileHeader(pTFile) < 0) { - tdCloseTFile(pTFile); - tdRemoveTFile(pTFile); - return -1; - } - - return 0; -} - -int32_t tdRemoveTFile(STFile *pTFile) { - if (taosRemoveFile(TD_TFILE_FULL_NAME(pTFile)) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - }; - return 0; -} - -#endif - // smaXXXUtil ================ void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { void *pResult = taosAcquireRef(rsetId, refId); if (!pResult) { smaWarn("rsma acquire ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr()); } else { - smaDebug("rsma acquire ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId); + smaTrace("rsma acquire ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId); } return pResult; } @@ -307,7 +114,7 @@ int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { smaWarn("rsma release ref for rsetId:%d refId:%" PRIi64 " failed since %s", rsetId, refId, terrstr()); return TSDB_CODE_FAILED; } - smaDebug("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId); + smaTrace("rsma release ref for rsetId:%d refId:%" PRIi64 " success", rsetId, refId); return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 426fceb7ede9e22841931ba5a9581fe66986e7ea..9a56df7cb0f535852c35fab8b6a5d56b5c7a865c 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -85,7 +85,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs uint64_t ts = 0; tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); return -1; } tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock); @@ -150,7 +150,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqError("vgId:%d task exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); return -1; } tqDebug("tmqsnap task execute end, get %p", pDataBlock); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 384b36589052053423a29230762bc3122ceff981..d726fbee5596271256cc5714ac5ded16062d08f5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1369,7 +1369,7 @@ _exit: taosMemoryFree(pWriter); } } else { - tsdbInfo("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__); + tsdbInfo("vgId:%d, %s done", TD_VID(pTsdb->pVnode), __func__); *ppWriter = pWriter; } return code; @@ -1390,7 +1390,7 @@ int32_t tsdbSnapWriterPrepareClose(STsdbSnapWriter* pWriter) { _exit: if (code) { - tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + tsdbError("vgId:%d, %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); } return code; } @@ -1441,7 +1441,7 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); } - tsdbInfo("vgId:%d %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); + tsdbInfo("vgId:%d, %s done", TD_VID(pWriter->pTsdb->pVnode), __func__); taosMemoryFree(pWriter); *ppWriter = NULL; return code; diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index bfa4dff03c9e21964289c79f13f1ec948f232d60..2e0370a81cd128f5ba42122a417d7faa96525544 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -184,16 +184,21 @@ _err: return -1; } -void vnodePrepareCommit(SVnode *pVnode) { +static int32_t vnodePrepareCommit(SVnode *pVnode) { + int32_t code = 0; tsem_wait(&pVnode->canCommit); tsdbPrepareCommit(pVnode->pTsdb); metaPrepareAsyncCommit(pVnode->pMeta); - smaPrepareAsyncCommit(pVnode->pSma); + code = smaPrepareAsyncCommit(pVnode->pSma); + if (code) goto _exit; +_exit: vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; + return code; } + static int32_t vnodeCommitTask(void *arg) { int32_t code = 0; @@ -203,10 +208,9 @@ static int32_t vnodeCommitTask(void *arg) { code = vnodeCommitImpl(pInfo); if (code) goto _exit; +_exit: // end commit tsem_post(&pInfo->pVnode->canCommit); - -_exit: taosMemoryFree(pInfo); return code; } @@ -214,7 +218,8 @@ int vnodeAsyncCommit(SVnode *pVnode) { int32_t code = 0; // prepare to commit - vnodePrepareCommit(pVnode); + code = vnodePrepareCommit(pVnode); + if (code) goto _exit; // schedule the task pVnode->state.commitTerm = pVnode->state.applyTerm; @@ -230,14 +235,15 @@ int vnodeAsyncCommit(SVnode *pVnode) { pInfo->info.state.commitID = pVnode->state.commitID; pInfo->pVnode = pVnode; pInfo->txn = metaGetTxn(pVnode->pMeta); - vnodeScheduleTask(vnodeCommitTask, pInfo); + code = vnodeScheduleTask(vnodeCommitTask, pInfo); _exit: if (code) { - vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), + tsem_post(&pVnode->canCommit); + vError("vgId:%d, %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code), pVnode->state.commitID); } else { - vDebug("vgId:%d %s done", TD_VID(pVnode), __func__); + vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__); } return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 1199127f6d42bf2d3f7488fb120e8fa9ab09c490..8d4e70cff987c69f7b57c505c8f557bc73b70609 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -15,13 +15,13 @@ #include "vnd.h" -#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ - do { \ - int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ - ASSERT(newVal >= 0); \ - if (newVal < 0) { \ - vWarn("vgId:%d %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \ - } \ +#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \ + do { \ + int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal)); \ + ASSERT(newVal >= 0); \ + if (newVal < 0) { \ + vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \ + } \ } while (0) int vnodeQueryOpen(SVnode *pVnode) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 19d9bc0cc6963210f7ad7ba35f3920654474c49b..22f7427a9505b89bdf4c397aac5146a21f55a864 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -325,7 +325,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp // commit if need if (vnodeShouldCommit(pVnode)) { vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); - vnodeAsyncCommit(pVnode); + if (vnodeAsyncCommit(pVnode) < 0) { + vError("vgId:%d, failed to vnode async commit since %s.", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } // start a new one if (vnodeBegin(pVnode) < 0) { diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 524abf3c2a889775ce67c9d3a377cb47936922fb..b83be2bebb41556d68ecd1596bb3065aacc20633 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -64,7 +64,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - sTrace("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + sTrace("vgId:%d, received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index a89c4bb546d548f2bc67bb4f18c3c9d63766e5b4..be536dec3e9c232dd9e1288de08ee71cfcabcd93 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -339,7 +339,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int64_t contLen; bool seeked = false; - wDebug("vgId:%d try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, try to fetch ver %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); @@ -393,7 +393,7 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) { int64_t code; - wDebug("vgId:%d skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); @@ -414,7 +414,7 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { SWalCont *pReadHead = &((*ppHead)->head); int64_t ver = pReadHead->version; - wDebug("vgId:%d fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 + wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 ", applied ver:%" PRId64, pRead->pWal->cfg.vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 4b9dde50592105520b28667900851bc7406c9578..e729a3bf7f186051cceb4f15584e8da2d013045f 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -585,6 +585,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_COMMIT, "Rsma fs commit error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REMOVE_EXISTS, "Rsma remove exists") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP, "Rsma fetch msg is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_EMPTY_INFO, "Rsma info is empty") @@ -592,6 +593,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_SCHEMA, "Rsma invalid schema TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_REGEX_MATCH, "Rsma regex match") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_OPEN, "Rsma stream state open") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state commit") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding")