未验证 提交 de3596c9 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #14320 from taosdata/feature/TD-11274-3.0

refactor: rsma code optimization
...@@ -615,6 +615,7 @@ int32_t* taosGetErrno(); ...@@ -615,6 +615,7 @@ int32_t* taosGetErrno();
//rsma //rsma
#define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150)
#define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151) #define TSDB_CODE_RSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3151)
#define TSDB_CODE_RSMA_QTASKINFO_CREATE TAOS_DEF_ERROR_CODE(0, 0x3152)
//index //index
#define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200)
......
...@@ -40,6 +40,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF ...@@ -40,6 +40,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma);
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma);
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
struct SRSmaInfoItem { struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo; SRSmaInfo *pRsmaInfo;
void *taskInfo; // qTaskInfo_t void *taskInfo; // qTaskInfo_t
...@@ -246,6 +250,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo ...@@ -246,6 +250,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo
pItem->pRsmaInfo = pRSmaInfo; pItem->pRsmaInfo = pRSmaInfo;
pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle); pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], pReadHandle);
if (!pItem->taskInfo) { if (!pItem->taskInfo) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
goto _err; goto _err;
} }
pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE;
...@@ -696,10 +701,9 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -696,10 +701,9 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
// step 1: iterate all stables to restore the rsma env
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
taosArrayDestroy(suidList); taosArrayDestroy(suidList);
...@@ -741,42 +745,88 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { ...@@ -741,42 +745,88 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
} }
} }
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore metaReaderClear(&mr);
STFile tFile = {0}; taosArrayDestroy(suidList);
char qTaskInfoFName[TSDB_FILENAME_LEN];
return TSDB_CODE_SUCCESS;
_err:
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName);
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
goto _err; goto _err;
} }
if(!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
goto _err; goto _err;
} }
SRSmaQTaskInfoIter fIter = {0}; SRSmaQTaskInfoIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile);
goto _err; goto _err;
} }
SRSmaQTaskInfoItem infoItem = {0};
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) { if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
tdRSmaQTaskInfoIterDestroy(&fIter); tdRSmaQTaskInfoIterDestroy(&fIter);
tdCloseTFile(&tFile);
goto _err; goto _err;
} }
tdRSmaQTaskInfoIterDestroy(&fIter); tdRSmaQTaskInfoIterDestroy(&fIter);
metaReaderClear(&mr); tdCloseTFile(&tFile);
taosArrayDestroy(suidList); return TSDB_CODE_SUCCESS;
_err:
smaError("rsma restore, qtaskinfo reload failed since %s", terrstr());
return TSDB_CODE_FAILED;
}
/**
* @brief reload ts data from checkpoint
*
* @param pSma
* @return int32_t
*/
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
// TODO
return TSDB_CODE_SUCCESS;
_err:
smaError("rsma restore, ts data reload failed since %s", terrstr());
return TSDB_CODE_FAILED;
}
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
// step 1: iterate all stables to restore the rsma env
if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) {
goto _err;
}
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) {
goto _err;
}
// step 3: reload ts data from checkpoint
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
goto _err;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_err: _err:
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to restore rsma task since %s", terrstr()); smaError("failed to restore rsma task since %s", terrstr());
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -581,10 +581,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointe ...@@ -581,10 +581,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointe
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters")
TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache")
//rsma //rsma
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state")
TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_QTASKINFO_CREATE, "Rsma qtaskinfo creation error")
//tq //tq
TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset") TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册