提交 23a9c8f3 编写于 作者: C Cary Xu

refactor: rsma restore and code optimization

上级 8281176b
...@@ -176,15 +176,18 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { ...@@ -176,15 +176,18 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) {
static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType);
void *tdFreeRSmaInfo(SRSmaInfo *pInfo);
void *tdFreeRSmaInfo(SRSmaInfo *pInfo); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdProcessRSmaRestoreImpl(SSma *pSma);
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg);
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg);
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// smaFileUtil ================ // smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
typedef struct STFInfo STFInfo; typedef struct STFInfo STFInfo;
typedef struct STFile STFile; typedef struct STFile STFile;
...@@ -220,12 +223,14 @@ int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); ...@@ -220,12 +223,14 @@ int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence);
int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte);
int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset);
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size);
int32_t tdRemoveTFile(STFile *pTFile); int32_t tdRemoveTFile(STFile *pTFile);
int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo);
int32_t tdUpdateTFileHeader(STFile *pTFile); int32_t tdUpdateTFileHeader(STFile *pTFile);
void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdUpdateTFileMagic(STFile *pTFile, void *pCksm);
void tdCloseTFile(STFile *pTFile); void tdCloseTFile(STFile *pTFile);
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName);
void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -123,14 +123,15 @@ int32_t smaOpen(SVnode *pVnode) { ...@@ -123,14 +123,15 @@ int32_t smaOpen(SVnode *pVnode) {
} }
// restore the rsma // restore the rsma
#if 0
if (rsmaRestore(pSma) < 0) { if (rsmaRestore(pSma) < 0) {
goto _err; goto _err;
} }
#endif
} }
return 0; return 0;
_err: _err:
taosMemoryFreeClear(pSma);
return -1; return -1;
} }
...@@ -168,36 +169,5 @@ int32_t smaClose(SSma *pSma) { ...@@ -168,36 +169,5 @@ int32_t smaClose(SSma *pSma) {
static int32_t rsmaRestore(SSma *pSma) { static int32_t rsmaRestore(SSma *pSma) {
ASSERT(VND_IS_RSMA(pSma->pVnode)); ASSERT(VND_IS_RSMA(pSma->pVnode));
// iterate all stables to restore the rsma env return tdProcessRSmaRestoreImpl(pSma);
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
smaError("failed to restore rsma since get stb id list error: %s", terrstr());
return TSDB_CODE_FAILED;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) {
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to get table meta for %" PRIi64 " since %s", suid, terrstr());
return TSDB_CODE_FAILED;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < 2; ++i) {
smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode),
suid, i, param->maxdelay[i], i, param->watermark[i]);
}
}
}
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -15,10 +15,14 @@ ...@@ -15,10 +15,14 @@
#include "sma.h" #include "sma.h"
#define RSMA_QTASK_PERSIST_MS 7200000 #define RSMA_QTASKINFO_PERSIST_MS 7200000
#define RSMA_QTASKINFO_BUFSIZE 32768
typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T;
static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"};
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskFIter SRSmaQTaskFIter;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle,
...@@ -27,6 +31,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType ...@@ -27,6 +31,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType
tb_uid_t suid, int8_t level); tb_uid_t suid, int8_t level);
static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaFetchTrigger(void *param, void *tmrId);
static void tdRSmaPersistTrigger(void *param, void *tmrId); static void tdRSmaPersistTrigger(void *param, void *tmrId);
static void *tdRSmaPersistExec(void *param);
static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName);
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile);
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish);
static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd);
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
struct SRSmaInfoItem { struct SRSmaInfoItem {
SRSmaInfo *pRsmaInfo; SRSmaInfo *pRsmaInfo;
...@@ -45,6 +56,24 @@ struct SRSmaInfo { ...@@ -45,6 +56,24 @@ struct SRSmaInfo {
SRSmaInfoItem items[TSDB_RETENTION_L2]; SRSmaInfoItem items[TSDB_RETENTION_L2];
}; };
struct SRSmaQTaskInfoItem {
int32_t len;
int8_t type;
int64_t suid;
void *qTaskInfo;
};
struct SRSmaQTaskFIter {
STFile *pTFile;
int64_t offset;
int64_t fsize;
int32_t nBytes;
int32_t nAlloc;
char *buf;
// ------------
int32_t nBufPos;
};
static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) {
// Note: free/kill may in RC // Note: free/kill may in RC
qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle);
...@@ -230,26 +259,21 @@ _err: ...@@ -230,26 +259,21 @@ _err:
} }
/** /**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. * @brief for rsam create or restore
* *
* @param pTsdb * @param pSma
* @param pMeta * @param param
* @param pReq * @param suid
* @param tbName
* @return int32_t * @return int32_t
*/ */
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) {
SSma *pSma = pVnode->pSma; SVnode *pVnode = pSma->pVnode;
if (!pReq->rollup) { SMeta *pMeta = pVnode->pMeta;
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); SMsgCb *pMsgCb = &pVnode->msgCb;
return TSDB_CODE_SUCCESS;
}
SMeta *pMeta = pVnode->pMeta;
SMsgCb *pMsgCb = &pVnode->msgCb;
SRSmaParam *param = &pReq->rsmaParam;
if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) {
smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -262,10 +286,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { ...@@ -262,10 +286,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t));
if (pRSmaInfo) { if (pRSmaInfo) {
ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally
smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -289,14 +313,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { ...@@ -289,14 +313,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
.vnode = pVnode, .vnode = pVnode,
}; };
STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), pReq->suid, -1); STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1);
if (!pTSchema) { if (!pTSchema) {
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
goto _err; goto _err;
} }
pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pTSchema = pTSchema;
pRSmaInfo->pSma = pSma; pRSmaInfo->pSma = pSma;
pRSmaInfo->suid = pReq->suid; pRSmaInfo->suid = suid;
if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) {
goto _err; goto _err;
...@@ -306,16 +330,16 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { ...@@ -306,16 +330,16 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
goto _err; goto _err;
} }
if (taosHashPut(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) {
goto _err; goto _err;
} else { } else {
smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid);
} }
// start the persist timer // start the persist timer
if (TASK_TRIGGER_STAT_INIT == if (TASK_TRIGGER_STAT_INIT ==
atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) { atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) {
taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat)); taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -325,6 +349,24 @@ _err: ...@@ -325,6 +349,24 @@ _err:
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
/**
* @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
*
* @param pTsdb
* @param pMeta
* @param pReq
* @return int32_t
*/
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SSma *pSma = pVnode->pSma;
if (!pReq->rollup) {
smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
}
return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name);
}
/** /**
* @brief store suid/[uids], prefer to use array and then hash * @brief store suid/[uids], prefer to use array and then hash
* *
...@@ -647,7 +689,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { ...@@ -647,7 +689,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) { static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) {
tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName);
} }
...@@ -690,7 +732,7 @@ static void *tdRSmaPersistExec(void *param) { ...@@ -690,7 +732,7 @@ static void *tdRSmaPersistExec(void *param) {
} }
char *pOutput = NULL; char *pOutput = NULL;
int32_t len = 0; int32_t len = 0;
int8_t type = 0; int8_t type = (int8_t)(i + 1);
if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) {
smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1,
terrstr(terrno)); terrstr(terrno));
...@@ -726,8 +768,9 @@ static void *tdRSmaPersistExec(void *param) { ...@@ -726,8 +768,9 @@ static void *tdRSmaPersistExec(void *param) {
isFileCreated = true; isFileCreated = true;
} }
len += (sizeof(len) + sizeof(pRSmaInfo->suid)); len += (sizeof(len)+ sizeof(type) + sizeof(pRSmaInfo->suid));
tdAppendTFile(&tFile, &len, sizeof(len), &toffset); tdAppendTFile(&tFile, &len, sizeof(len), &toffset);
tdAppendTFile(&tFile, &type, sizeof(type), &toffset);
tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset); tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset);
tdAppendTFile(&tFile, pOutput, len, &toffset); tdAppendTFile(&tFile, pOutput, len, &toffset);
...@@ -824,8 +867,12 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -824,8 +867,12 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_CANCELLED,
TASK_TRIGGER_STAT_FINISHED)) { TASK_TRIGGER_STAT_FINISHED)) {
smaDebug("rsma persistence start since active"); smaDebug("rsma persistence start since active");
// start persist task
tdRSmaPersistTask(pRSmaStat); tdRSmaPersistTask(pRSmaStat);
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId);
taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle,
&pRSmaStat->tmrId);
} else { } else {
atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0);
} }
...@@ -845,4 +892,250 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ...@@ -845,4 +892,250 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) {
ASSERT(0); ASSERT(0);
} break; } break;
} }
} }
\ No newline at end of file
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
SVnode *pVnode = pSma->pVnode;
// step 1: iterate all stables to restore the rsma env
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
smaError("vgId:%d, failed to restore rsma since get stb id list error: %s", TD_VID(pVnode), terrstr());
return TSDB_CODE_FAILED;
}
if (taosArrayGetSize(suidList) == 0) {
smaDebug("vgId:%d no need to restore rsma since empty stb id list", TD_VID(pVnode));
return TSDB_CODE_SUCCESS;
}
SMetaReader mr = {0};
metaReaderInit(&mr, SMA_META(pSma), 0);
for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) {
tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
smaDebug("suid [%d] is %" PRIi64, i, suid);
if (metaGetTableEntryByUid(&mr, suid) < 0) {
smaError("vgId:%d failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
}
ASSERT(mr.me.type == TSDB_SUPER_TABLE);
ASSERT(mr.me.uid == suid);
if (TABLE_IS_ROLLUP(mr.me.flags)) {
SRSmaParam *param = &mr.me.stbEntry.rsmaParam;
for (int i = 0; i < 2; ++i) {
smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode),
suid, i, param->maxdelay[i], i, param->watermark[i]);
}
if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) {
smaError("vgId:%d failed to retore rsma env for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr());
goto _err;
}
}
}
// step 2: retrieve qtaskinfo object from the rsma/qtaskinfo file and restore
STFile tFile = {0};
char qTaskInfoFName[TSDB_FILENAME_LEN];
tdRSmaQTaskGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName);
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
goto _err;
}
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
goto _err;
}
SRSmaQTaskFIter fIter = {0};
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
goto _err;
}
SRSmaQTaskInfoItem infoItem = {0};
bool isEnd = false;
int32_t code = 0;
while ((code = tdRSmaQTaskInfoIterNext(&fIter, &infoItem, &isEnd)) == 0) {
if (isEnd) {
break;
}
if((code = tdRSmaQTaskInfoItemRestore(pSma, &infoItem)) < 0){
break;
}
}
if (code < 0) goto _err;
metaReaderClear(&mr);
taosArrayDestroy(suidList);
return TSDB_CODE_SUCCESS;
_err:
ASSERT(0);
metaReaderClear(&mr);
taosArrayDestroy(suidList);
smaError("failed to restore rsma info since %s", terrstr());
return TSDB_CODE_FAILED;
}
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem) {
SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv);
SRSmaInfo *pRSmaInfo = NULL;
void *qTaskInfo = NULL;
pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &infoItem->suid, sizeof(infoItem->suid));
if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) {
smaDebug("vgId:%d, no restore as no rsma info for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid);
return TSDB_CODE_SUCCESS;
}
if (infoItem->type == 1) {
qTaskInfo = pRSmaInfo->items[0].taskInfo;
} else if (infoItem->type == 2) {
qTaskInfo = pRSmaInfo->items[1].taskInfo;
} else {
ASSERT(0);
}
if (!qTaskInfo) {
smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid);
return TSDB_CODE_SUCCESS;
}
if (qDeserializeTaskStatus(qTaskInfo, infoItem->qTaskInfo, infoItem->len) < 0) {
smaError("vgId:%d, restore rsma failed for suid:%" PRIi64 " level %d since %s", SMA_VID(pSma), infoItem->suid,
infoItem->type, terrstr(terrno));
return TSDB_CODE_FAILED;
}
smaDebug("vgId:%d, restore rsma success for suid:%" PRIi64 " level %d", SMA_VID(pSma), infoItem->suid,
infoItem->type);
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) {
memset(pIter, 0, sizeof(*pIter));
pIter->pTFile = pTFile;
pIter->offset = TD_FILE_HEAD_SIZE;
if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) {
return TSDB_CODE_FAILED;
}
if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) {
pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE;
} else {
pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE;
}
if (pIter->nAlloc < TD_FILE_HEAD_SIZE) {
pIter->nAlloc = TD_FILE_HEAD_SIZE;
}
pIter->buf = taosMemoryMalloc(pIter->nAlloc);
if (!pIter->buf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish) {
STFile *pTFile = pIter->pTFile;
int64_t nBytes = RSMA_QTASKINFO_BUFSIZE;
if (pIter->offset >= pIter->fsize) {
*isFinish = true;
return TSDB_CODE_SUCCESS;
}
if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) {
nBytes = pIter->fsize - pIter->offset;
}
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
int32_t infoLen = 0;
taosDecodeFixedI32(pIter->buf, &infoLen);
if (infoLen > nBytes) {
ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE);
pIter->nAlloc = infoLen;
void *pBuf = taosMemoryRealloc(pIter->buf, infoLen);
if (!pBuf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pIter->buf = pBuf;
nBytes = infoLen;
if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
}
pIter->offset += nBytes;
pIter->nBytes = nBytes;
pIter->nBufPos = 0;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) {
return lenWithHead - sizeof(int32_t) - sizeof(int8_t) - sizeof(int64_t);
}
static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd) {
while (1) {
// block iter
bool isFinish = false;
if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) {
ASSERT(0);
return TSDB_CODE_FAILED;
}
if (isFinish) {
*isEnd = true;
return TSDB_CODE_SUCCESS;
}
// consume the block
int32_t qTaskInfoLenWithHead = 0;
pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead);
if(qTaskInfoLenWithHead < 0) {
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
return TSDB_CODE_FAILED;
}
while (1) {
if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) {
pIter->buf = taosDecodeFixedI8(pIter->buf, &pItem->type);
pIter->buf = taosDecodeFixedI64(pIter->buf, &pItem->suid);
pItem->qTaskInfo = pIter->buf;
pItem->len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead);
// do the restore job
printf("%s:%d ###### restore the qtask info offset:%" PRIi64 "\n", __func__, __LINE__, pIter->offset);
pIter->buf = POINTER_SHIFT(pIter->buf, pItem->len);
pIter->nBufPos += qTaskInfoLenWithHead;
pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead);
continue;
}
// prepare and load next block in the file
pIter->offset -= (pIter->nBytes - pIter->nBufPos);
break;
}
}
return TSDB_CODE_SUCCESS;
}
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
// smaFileUtil ================ // smaFileUtil ================
#define TD_FILE_HEAD_SIZE 512
#define TD_FILE_STATE_OK 0 #define TD_FILE_STATE_OK 0
#define TD_FILE_STATE_BAD 1 #define TD_FILE_STATE_BAD 1
...@@ -71,6 +69,11 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { ...@@ -71,6 +69,11 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) {
return loffset; return loffset;
} }
int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) {
ASSERT(TD_FILE_OPENED(pTFile));
return taosFStatFile(pTFile->pFile, size, NULL);
}
int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) {
ASSERT(TD_FILE_OPENED(pTFile)); ASSERT(TD_FILE_OPENED(pTFile));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册