diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 4e87589e600e94b3b147412774c8d6da30fb3ece..36a7615af1f4c26f5f60a480a249a8f085b6db9f 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -105,17 +105,16 @@ struct SRSmaFS { struct SRSmaStat { SSma *pSma; - int64_t commitAppliedVer; // vnode applied version for async commit - int64_t refId; // shared by fetch tasks - volatile int64_t nBufItems; // number of items in queue buffer - SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) - volatile int32_t nFetchAll; // active number of fetch all - volatile int8_t triggerStat; // shared by fetch tasks - volatile int8_t commitStat; // 0 not in committing, 1 in committing - volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo - SRSmaFS fs; // for recovery/snapshot r/w - SHashObj *infoHash; // key: suid, value: SRSmaInfo - tsem_t notEmpty; // has items in queue buffer + int64_t refId; // shared by fetch tasks + volatile int64_t nBufItems; // number of items in queue buffer + SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int32_t nFetchAll; // active number of fetch all + volatile int8_t triggerStat; // shared by fetch tasks + volatile int8_t commitStat; // 0 not in committing, 1 in committing + volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo + SRSmaFS fs; // for recovery/snapshot r/w + SHashObj *infoHash; // key: suid, value: SRSmaInfo + tsem_t notEmpty; // has items in queue buffer }; struct SSmaStat { @@ -156,9 +155,9 @@ struct SRSmaInfo { int16_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; - void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t - STaosQueue *queue; // buffer queue of SubmitReq - STaosQall *qall; // buffer qall of SubmitReq + void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t + STaosQueue *queue; // buffer queue of SubmitReq + STaosQall *qall; // buffer qall of SubmitReq }; #define RSMA_INFO_HEAD_LEN offsetof(SRSmaInfo, items) @@ -221,6 +220,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName); @@ -234,7 +234,6 @@ static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { smaTrace("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); } - void tdRSmaGetDirName(int32_t vgId, const char *pdname, const char *dname, bool endWithSep, char *outputName); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 16505b7dc7e4e38209d970aa55aa1826fbcd4a94..e770e8c22a5b615247b38ccb77b746fa0f4342a8 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -149,13 +149,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { tdSmaLoopsCheck(&nLoops, 1000); } - - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - if (ASSERTS(pRSmaStat->commitAppliedVer >= -1, "commit applied version %" PRIi64 " < -1", - pRSmaStat->commitAppliedVer)) { - code = TSDB_CODE_APP_ERROR; - TSDB_CHECK_CODE(code, lino, _exit); - } } // step 2: wait for all triggered fetch tasks to finish nLoops = 0; @@ -183,6 +176,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; + code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + TSDB_CHECK_CODE(code, lino, _exit); + + smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); // all rsma results are written completely diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 53b27863217bbfb7d74391c7b681e88505d47fa6..000589d0fe76014b105cbb6b4be126199e0c2d11 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1052,6 +1052,47 @@ _err: return code; } +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { + int32_t code = 0; + int32_t lino = 0; + SSma *pSma = pRSmaStat->pSma; + SVnode *pVnode = pSma->pVnode; + SRSmaFS fs = {0}; + + if (taosHashGetSize(pInfoHash) <= 0) { + return TSDB_CODE_SUCCESS; + } + + void *infoHash = NULL; + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); + if (pItem && pItem->pStreamState) { + if (streamStateCommit(pItem->pStreamState) < 0) { + code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; + TSDB_CHECK_CODE(code, lino, _exit); + } + smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), + pRSmaInfo->suid, i + 1); + } + } + } + +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + + terrno = code; + return code; +} + /** * @brief trigger to get rsma result in async mode *