diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 9312d18b806350a38f5c7f829a83eca9d5f74c52..f2673ca433faf7f83142b1640a1c53480beac4ba 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -219,11 +219,12 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); void tdRSmaFSClose(SRSmaFS *fs); int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); int32_t tdRSmaFSCommit(SSma *pSma); +int32_t tdRSmaFSFinishCommit(SSma *pSma); int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFSOut); int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut); int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version); -int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile); +int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t size); int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index d969aad12008e818e79edf480afa6e19573cf470..e7e1406238e3aa0847375dab28fd8a5db2dbb4df 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -97,18 +97,22 @@ _exit: int32_t smaFinishCommit(SSma *pSma) { int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; + code = tdRSmaFSFinishCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); + if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { - smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } if (VND_RSMA2(pVnode) && (code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { - smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } _exit: - terrno = code; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -123,7 +127,7 @@ _exit: * @return int32_t */ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { - #if 0 +#if 0 SVnode *pVnode = pSma->pVnode; SRSmaFS *pFS = RSMA_FS(pStat); int64_t committed = pStat->commitAppliedVer; @@ -170,7 +174,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - #endif +#endif return TSDB_CODE_SUCCESS; } @@ -279,21 +283,22 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { */ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; + code = tdRSmaFSCommit(pSma); + TSDB_CHECK_CODE(code, lino, _exit); - + code = tsdbCommit(VND_RSMA1(pVnode), pInfo); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbCommit(VND_RSMA2(pVnode), pInfo); + TSDB_CHECK_CODE(code, lino, _exit); - if ((code = tsdbCommit(VND_RSMA1(pVnode), pInfo)) < 0) { - smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } - if ((code = tsdbCommit(VND_RSMA2(pVnode), pInfo)) < 0) { - smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); - goto _exit; - } _exit: - terrno = code; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } return code; } diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 257769ba11dd81f2e6826df4477caccda3ab6842..32dca7f6c9b14931a921860387448607dd3c8e9d 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -17,9 +17,8 @@ // ================================================================================================= -static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); +// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); -static int32_t tdQTaskInfCmprFn2(const void *p1, const void *p2); static FORCE_INLINE int32_t tPutQTaskF(uint8_t *p, SQTaskFile *pFile) { int32_t n = 0; @@ -252,75 +251,49 @@ static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2) { static int32_t tdRSmaFSApplyChange(SSma *pSma, SRSmaFS *pFS) { int32_t code = 0; -#if 0 int32_t lino = 0; SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - - int32_t nRef = 0; - char fname[TSDB_FILENAME_LEN] = {0}; + SRSmaFS *pFSOld = RSMA_FS(pStat); + int64_t version = pStat->commitAppliedVer; + char fname[TSDB_FILENAME_LEN] = {0}; // SQTaskFile - int32_t iOld = 0; + int32_t nNew = taosArrayGetSize(pFS->aQTaskInf); int32_t iNew = 0; - while (true) { - int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); - int32_t nNew = taosArrayGetSize(pFS->aDFileSet); - SDFileSet fSet = {0}; - int8_t sameDisk = 0; - - if (iOld >= nOld && iNew >= nNew) break; - - SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; - SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL; - - if (pSetOld && pSetNew) { - if (pSetOld->fid == pSetNew->fid) { - code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew); - TSDB_CHECK_CODE(code, lino, _exit); + while (iNew < nNew) { + SQTaskFile *pQTaskFNew = TARRAY_GET_ELEM(pFS->aQTaskInf, iNew); - iOld++; - iNew++; - } else if (pSetOld->fid < pSetNew->fid) { - code = tsdbRemoveFileSet(pTsdb, pSetOld); - TSDB_CHECK_CODE(code, lino, _exit); - taosArrayRemove(pTsdb->fs.aDFileSet, iOld); - } else { - code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); - TSDB_CHECK_CODE(code, lino, _exit) - - if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } + int32_t idx = taosArraySearchIdx(pFSOld->aQTaskInf, pQTaskFNew, tdQTaskInfCmprFn1, TD_GE); - iOld++; - iNew++; - } - } else if (pSetOld) { - code = tsdbRemoveFileSet(pTsdb, pSetOld); - TSDB_CHECK_CODE(code, lino, _exit); - taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + if (idx < 0) { + idx = taosArrayGetSize(pFSOld->aQTaskInf); + pQTaskFNew->nRef = 1; } else { - code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); - TSDB_CHECK_CODE(code, lino, _exit) - - if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); + SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFSOld->aQTaskInf, idx); + int32_t c = tdQTaskInfCmprFn1(pTaskF, pQTaskFNew); + if (c < 0) { + + } else if(c == 0) { + ++iNew; + continue; + } else { + ASSERT(0); } + } - iOld++; - iNew++; + if (taosArrayInsert(pFS->aQTaskInf, idx, qTaskF) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } + } _exit: if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } -#endif return code; } @@ -332,6 +305,7 @@ _exit: * @param output * @return int32_t */ +#if 0 static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) { SVnode *pVnode = pSma->pVnode; TdDirPtr pDir = NULL; @@ -419,7 +393,7 @@ _end: regfree(®ex); return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED; } - +#endif static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) { int32_t code = 0; @@ -468,7 +442,6 @@ _exit: return code; } - // EXPOSED APIS ==================================================================================== int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) { @@ -547,7 +520,9 @@ int32_t tdRSmaFSCommit(SSma *pSma) { char current_t[TSDB_FILENAME_LEN] = {0}; tdRSmaGetCurrentFName(pSma, current, current_t); - if (!taosCheckExistFile(current_t)) goto _exit; + if (!taosCheckExistFile(current_t)) { + goto _exit; + } // rename the file if (taosRenameFile(current_t, current) < 0) { @@ -556,7 +531,7 @@ int32_t tdRSmaFSCommit(SSma *pSma) { } // Load the new FS - code = tdRSmaFSCreate(&fs, 0); + code = tdRSmaFSCreate(&fs, 1); TSDB_CHECK_CODE(code, lino, _exit); code = tdRSmaLoadFSFromFile(current, &fs); @@ -574,6 +549,28 @@ _exit: return code; } +int32_t tdRSmaFSFinishCommit(SSma *pSma) { + int32_t code = 0; + int32_t lino = 0; + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); + + taosWLockLatch(RSMA_FS_LOCK(pStat)); + if ((code = tdRSmaFSCommit(pSma)) < 0) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } else { + smaInfo("vgId:%d, rsmaFS finish commit", SMA_VID(pSma)); + } + return code; +} + int32_t tdRSmaFSRollback(SSma *pSma) { int32_t code = 0; int32_t lino = 0; @@ -603,9 +600,8 @@ int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t siz SQTaskFile *pTaskF = (SQTaskFile *)taosArrayGet(pFS->aQTaskInf, idx); int32_t c = tdQTaskInfCmprFn1(pTaskF, qTaskF); if (c == 0) { - ASSERT(0); - pTaskF->nRef = qTaskF->nRef; ASSERT(pTaskF->size == qTaskF->size); + ASSERT(0); goto _exit; } } @@ -634,7 +630,7 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, in ASSERT(oldVal > 0); } } else { - // ref all + // ref all int32_t size = taosArrayGetSize(aQTaskInf); for (int32_t i = 0; i < size; ++i) { pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); @@ -701,15 +697,18 @@ void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int } int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFSOut) { + int32_t code = 0; + int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - int32_t code = 0; taosRLockLatch(RSMA_FS_LOCK(pStat)); code = tdRSmaFSCopy(pSma, pFSOut); + TSDB_CHECK_CODE(code, lino, _exit); taosWUnLockLatch(RSMA_FS_LOCK(pStat)); _exit: if (code) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pSma->pVnode), __func__, lino, tstrerror(code)); } return code; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index c3e7b367459696b4c419a5942a7709d0e6f1ae38..c58923ad2db994b2bd91907882e4fb5a7a1a5c33 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1271,9 +1271,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } } + // prepare code = tdRSmaFSTakeSnapshot(pSma, &fs); TSDB_CHECK_CODE(code, lino, _exit); - + code = tdRSmaFSUpsertQTaskFile(&fs, qTaskFArray->pData, taosArrayGetSize(qTaskFArray)); + TSDB_CHECK_CODE(code, lino, _exit); code = tdRSmaFSPrepareCommit(pSma, &fs); TSDB_CHECK_CODE(code, lino, _exit);