diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8565eea63bd310d9538e49c7f4214c3f243cbf40..967b5796ec374284e5799f142771c7cc6c337246 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -615,6 +615,7 @@ int32_t* taosGetErrno(); //rsma #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) +#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 34c3ef8fca210888cb8e9f31c37c65c74b2d408d..4402f37c34d24be6176590ae14d93507837dd9c6 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -40,6 +40,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma); +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma); +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma); + struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; void *taskInfo; // qTaskInfo_t @@ -246,6 +250,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo pItem->pRsmaInfo = pRSmaInfo; pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); if (!pItem->taskInfo) { + terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; goto _err; } pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; @@ -696,10 +701,9 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } -int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { +static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) { SVnode *pVnode = pSma->pVnode; - // step 1: iterate all stables to restore the rsma env SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { taosArrayDestroy(suidList); @@ -741,42 +745,88 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { } } - // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore - STFile tFile = {0}; - char qTaskInfoFName[TSDB_FILENAME_LEN]; + metaReaderClear(&mr); + taosArrayDestroy(suidList); + + return TSDB_CODE_SUCCESS; +_err: + metaReaderClear(&mr); + taosArrayDestroy(suidList); + + return TSDB_CODE_FAILED; +} + +static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) { + SVnode *pVnode = pSma->pVnode; + STFile tFile = {0}; + char qTaskInfoFName[TSDB_FILENAME_LEN]; tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { goto _err; } - - if(!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { - metaReaderClear(&mr); - taosArrayDestroy(suidList); + + if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { return TSDB_CODE_SUCCESS; } if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { goto _err; } - + SRSmaQTaskInfoIter fIter = {0}; if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { + tdRSmaQTaskInfoIterDestroy(&fIter); + tdCloseTFile(&tFile); goto _err; } - SRSmaQTaskInfoItem infoItem = {0}; + if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) { tdRSmaQTaskInfoIterDestroy(&fIter); + tdCloseTFile(&tFile); goto _err; } tdRSmaQTaskInfoIterDestroy(&fIter); - metaReaderClear(&mr); - taosArrayDestroy(suidList); + tdCloseTFile(&tFile); + return TSDB_CODE_SUCCESS; +_err: + smaError("rsma restore, qtaskinfo reload failed since %s", terrstr()); + return TSDB_CODE_FAILED; +} + +/** + * @brief reload ts data from checkpoint + * + * @param pSma + * @return int32_t + */ +static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) { + // TODO + return TSDB_CODE_SUCCESS; +_err: + smaError("rsma restore, ts data reload failed since %s", terrstr()); + return TSDB_CODE_FAILED; +} + +int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { + // step 1: iterate all stables to restore the rsma env + if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) { + goto _err; + } + + // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore + if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) { + goto _err; + } + + // step 3: reload ts data from checkpoint + if (tdRSmaRestoreTSDataReload(pSma) < 0) { + goto _err; + } + return TSDB_CODE_SUCCESS; _err: - metaReaderClear(&mr); - taosArrayDestroy(suidList); smaError("failed to restore rsma task since %s", terrstr()); return TSDB_CODE_FAILED; } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index fa10fc26dd7cf3817e70cb00fb76ceb4522dadf3..651fd90bcfb702ba43f61ad1416382fa8d40a350 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -581,10 +581,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointe TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache") - //rsma TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error") //tq TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")