diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 956e451b58e21c7a5455cca639723188968472af..4b42ab526308f1aac34a5c9b4ad6de87ba60dd94 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -176,15 +176,18 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); +void *tdFreeRSmaInfo(SRSmaInfo *pInfo); -void *tdFreeRSmaInfo(SRSmaInfo *pInfo); - +int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); +int32_t tdProcessRSmaRestoreImpl(SSma *pSma); int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg); int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg); int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); // smaFileUtil ================ +#define TD_FILE_HEAD_SIZE 512 + typedef struct STFInfo STFInfo; typedef struct STFile STFile; @@ -220,12 +223,14 @@ int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte); int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence); int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte); int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset); +int64_t tdGetTFileSize(STFile *pTFile, int64_t *size); int32_t tdRemoveTFile(STFile *pTFile); int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo); int32_t tdUpdateTFileHeader(STFile *pTFile); void tdUpdateTFileMagic(STFile *pTFile, void *pCksm); void tdCloseTFile(STFile *pTFile); -void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); + +void tdGetVndFileName(int32_t vid, const char *dname, const char *fname, char *outputName); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index fad31f92c069935c3d732f4ba18e51bdc967207a..88ed7426f7390d00d96d7559d1d7d3242142e00d 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -123,14 +123,15 @@ int32_t smaOpen(SVnode *pVnode) { } // restore the rsma +#if 0 if (rsmaRestore(pSma) < 0) { goto _err; } +#endif } return 0; _err: - taosMemoryFreeClear(pSma); return -1; } @@ -168,36 +169,5 @@ int32_t smaClose(SSma *pSma) { static int32_t rsmaRestore(SSma *pSma) { ASSERT(VND_IS_RSMA(pSma->pVnode)); - // iterate all stables to restore the rsma env - SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); - if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { - smaError("failed to restore rsma since get stb id list error: %s", terrstr()); - return TSDB_CODE_FAILED; - } - - SMetaReader mr = {0}; - metaReaderInit(&mr, SMA_META(pSma), 0); - for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { - tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - smaDebug("suid [%d] is %" PRIi64, i, suid); - if (metaGetTableEntryByUid(&mr, suid) < 0) { - metaReaderClear(&mr); - taosArrayDestroy(suidList); - smaError("failed to get table meta for %" PRIi64 " since %s", suid, terrstr()); - return TSDB_CODE_FAILED; - } - ASSERT(mr.me.type == TSDB_SUPER_TABLE); - if (TABLE_IS_ROLLUP(mr.me.flags)) { - SRSmaParam *param = &mr.me.stbEntry.rsmaParam; - for (int i = 0; i < 2; ++i) { - smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode), - suid, i, param->maxdelay[i], i, param->watermark[i]); - } - } - } - - metaReaderClear(&mr); - taosArrayDestroy(suidList); - - return TSDB_CODE_SUCCESS; + return tdProcessRSmaRestoreImpl(pSma); } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 26b9a34fa34b018d96ea0bd2487f57f6542a8332..4f593c33da147decb99c3dd6e31b6e35cf28799d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,10 +15,14 @@ #include "sma.h" -#define RSMA_QTASK_PERSIST_MS 7200000 +#define RSMA_QTASKINFO_PERSIST_MS 7200000 +#define RSMA_QTASKINFO_BUFSIZE 32768 typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; +typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; +typedef struct SRSmaQTaskFIter SRSmaQTaskFIter; + static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaInfo *pRSmaInfo, SReadHandle *handle, @@ -27,6 +31,13 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType tb_uid_t suid, int8_t level); static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaPersistTrigger(void *param, void *tmrId); +static void *tdRSmaPersistExec(void *param); +static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName); + +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile); +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish); +static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd); +static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); struct SRSmaInfoItem { SRSmaInfo *pRsmaInfo; @@ -45,6 +56,24 @@ struct SRSmaInfo { SRSmaInfoItem items[TSDB_RETENTION_L2]; }; +struct SRSmaQTaskInfoItem { + int32_t len; + int8_t type; + int64_t suid; + void *qTaskInfo; +}; + +struct SRSmaQTaskFIter { + STFile *pTFile; + int64_t offset; + int64_t fsize; + int32_t nBytes; + int32_t nAlloc; + char *buf; + // ------------ + int32_t nBufPos; +}; + static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); @@ -230,26 +259,21 @@ _err: } /** - * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. + * @brief for rsam create or restore * - * @param pTsdb - * @param pMeta - * @param pReq + * @param pSma + * @param param + * @param suid + * @param tbName * @return int32_t */ -int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { - SSma *pSma = pVnode->pSma; - if (!pReq->rollup) { - smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); - return TSDB_CODE_SUCCESS; - } - - SMeta *pMeta = pVnode->pMeta; - SMsgCb *pMsgCb = &pVnode->msgCb; - SRSmaParam *param = &pReq->rsmaParam; +int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName) { + SVnode *pVnode = pSma->pVnode; + SMeta *pMeta = pVnode->pMeta; + SMsgCb *pMsgCb = &pVnode->msgCb; if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { - smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); + smaDebug("vgId:%d, no qmsg1/qmsg2 for rollup table %s %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; } @@ -262,10 +286,10 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL; - pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t)); + pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo) { ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally - smaWarn("vgId:%d, rsma info already exists for stb: %s, %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); + smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; } @@ -289,14 +313,14 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { .vnode = pVnode, }; - STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), pReq->suid, -1); + STSchema *pTSchema = metaGetTbTSchema(SMA_META(pSma), suid, -1); if (!pTSchema) { terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; goto _err; } pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->pSma = pSma; - pRSmaInfo->suid = pReq->suid; + pRSmaInfo->suid = suid; if (tdSetRSmaInfoItemParams(pSma, param, pRSmaInfo, &handle, 0) < 0) { goto _err; @@ -306,16 +330,16 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { goto _err; } - if (taosHashPut(RSMA_INFO_HASH(pStat), &pReq->suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { + if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; } else { - smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), pReq->suid); + smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); } // start the persist timer if (TASK_TRIGGER_STAT_INIT == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) { - taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat)); + taosTmrStart(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pStat, RSMA_TMR_HANDLE(pStat)); } return TSDB_CODE_SUCCESS; @@ -325,6 +349,24 @@ _err: return TSDB_CODE_FAILED; } +/** + * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. + * + * @param pTsdb + * @param pMeta + * @param pReq + * @return int32_t + */ +int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { + SSma *pSma = pVnode->pSma; + if (!pReq->rollup) { + smaTrace("vgId:%d, return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); + return TSDB_CODE_SUCCESS; + } + + return tdProcessRSmaCreateImpl(pSma, &pReq->rsmaParam, pReq->suid, pReq->name); +} + /** * @brief store suid/[uids], prefer to use array and then hash * @@ -647,7 +689,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } -void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) { +static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) { tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); } @@ -690,7 +732,7 @@ static void *tdRSmaPersistExec(void *param) { } char *pOutput = NULL; int32_t len = 0; - int8_t type = 0; + int8_t type = (int8_t)(i + 1); if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, terrstr(terrno)); @@ -726,8 +768,9 @@ static void *tdRSmaPersistExec(void *param) { isFileCreated = true; } - len += (sizeof(len) + sizeof(pRSmaInfo->suid)); + len += (sizeof(len)+ sizeof(type) + sizeof(pRSmaInfo->suid)); tdAppendTFile(&tFile, &len, sizeof(len), &toffset); + tdAppendTFile(&tFile, &type, sizeof(type), &toffset); tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset); tdAppendTFile(&tFile, pOutput, len, &toffset); @@ -824,8 +867,12 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { TASK_TRIGGER_STAT_CANCELLED, TASK_TRIGGER_STAT_FINISHED)) { smaDebug("rsma persistence start since active"); + + // start persist task tdRSmaPersistTask(pRSmaStat); - taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASK_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, &pRSmaStat->tmrId); + + taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, + &pRSmaStat->tmrId); } else { atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); } @@ -845,4 +892,250 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { ASSERT(0); } break; } -} \ No newline at end of file +} + +int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { + SVnode *pVnode = pSma->pVnode; + + // step 1: iterate all stables to restore the rsma env + + SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); + if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { + smaError("vgId:%d, failed to restore rsma since get stb id list error: %s", TD_VID(pVnode), terrstr()); + return TSDB_CODE_FAILED; + } + + if (taosArrayGetSize(suidList) == 0) { + smaDebug("vgId:%d no need to restore rsma since empty stb id list", TD_VID(pVnode)); + return TSDB_CODE_SUCCESS; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, SMA_META(pSma), 0); + for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { + tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); + smaDebug("suid [%d] is %" PRIi64, i, suid); + if (metaGetTableEntryByUid(&mr, suid) < 0) { + smaError("vgId:%d failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); + goto _err; + } + ASSERT(mr.me.type == TSDB_SUPER_TABLE); + ASSERT(mr.me.uid == suid); + if (TABLE_IS_ROLLUP(mr.me.flags)) { + SRSmaParam *param = &mr.me.stbEntry.rsmaParam; + for (int i = 0; i < 2; ++i) { + smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode), + suid, i, param->maxdelay[i], i, param->watermark[i]); + } + if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { + smaError("vgId:%d failed to retore rsma env for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); + goto _err; + } + } + } + + // step 2: retrieve qtaskinfo object from the rsma/qtaskinfo file and restore + STFile tFile = {0}; + char qTaskInfoFName[TSDB_FILENAME_LEN]; + + tdRSmaQTaskGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); + if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { + goto _err; + } + if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { + goto _err; + } + SRSmaQTaskFIter fIter = {0}; + if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { + goto _err; + } + SRSmaQTaskInfoItem infoItem = {0}; + bool isEnd = false; + int32_t code = 0; + while ((code = tdRSmaQTaskInfoIterNext(&fIter, &infoItem, &isEnd)) == 0) { + if (isEnd) { + break; + } + if((code = tdRSmaQTaskInfoItemRestore(pSma, &infoItem)) < 0){ + break; + } + } + if (code < 0) goto _err; + + metaReaderClear(&mr); + taosArrayDestroy(suidList); + return TSDB_CODE_SUCCESS; +_err: + ASSERT(0); + metaReaderClear(&mr); + taosArrayDestroy(suidList); + smaError("failed to restore rsma info since %s", terrstr()); + return TSDB_CODE_FAILED; +} + +static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem) { + SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); + SRSmaInfo *pRSmaInfo = NULL; + void *qTaskInfo = NULL; + + pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &infoItem->suid, sizeof(infoItem->suid)); + + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + smaDebug("vgId:%d, no restore as no rsma info for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); + return TSDB_CODE_SUCCESS; + } + + if (infoItem->type == 1) { + qTaskInfo = pRSmaInfo->items[0].taskInfo; + } else if (infoItem->type == 2) { + qTaskInfo = pRSmaInfo->items[1].taskInfo; + } else { + ASSERT(0); + } + + if (!qTaskInfo) { + smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); + return TSDB_CODE_SUCCESS; + } + + if (qDeserializeTaskStatus(qTaskInfo, infoItem->qTaskInfo, infoItem->len) < 0) { + smaError("vgId:%d, restore rsma failed for suid:%" PRIi64 " level %d since %s", SMA_VID(pSma), infoItem->suid, + infoItem->type, terrstr(terrno)); + return TSDB_CODE_FAILED; + } + smaDebug("vgId:%d, restore rsma success for suid:%" PRIi64 " level %d", SMA_VID(pSma), infoItem->suid, + infoItem->type); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) { + memset(pIter, 0, sizeof(*pIter)); + pIter->pTFile = pTFile; + pIter->offset = TD_FILE_HEAD_SIZE; + + if (tdGetTFileSize(pTFile, &pIter->fsize) < 0) { + return TSDB_CODE_FAILED; + } + + if ((pIter->fsize - TD_FILE_HEAD_SIZE) < RSMA_QTASKINFO_BUFSIZE) { + pIter->nAlloc = pIter->fsize - TD_FILE_HEAD_SIZE; + } else { + pIter->nAlloc = RSMA_QTASKINFO_BUFSIZE; + } + + if (pIter->nAlloc < TD_FILE_HEAD_SIZE) { + pIter->nAlloc = TD_FILE_HEAD_SIZE; + } + + pIter->buf = taosMemoryMalloc(pIter->nAlloc); + if (!pIter->buf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish) { + STFile *pTFile = pIter->pTFile; + int64_t nBytes = RSMA_QTASKINFO_BUFSIZE; + + if (pIter->offset >= pIter->fsize) { + *isFinish = true; + return TSDB_CODE_SUCCESS; + } + + if ((pIter->fsize - pIter->offset) < RSMA_QTASKINFO_BUFSIZE) { + nBytes = pIter->fsize - pIter->offset; + } + + if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET) < 0) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + int32_t infoLen = 0; + taosDecodeFixedI32(pIter->buf, &infoLen); + if (infoLen > nBytes) { + ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE); + pIter->nAlloc = infoLen; + void *pBuf = taosMemoryRealloc(pIter->buf, infoLen); + if (!pBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pIter->buf = pBuf; + nBytes = infoLen; + + if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + + if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + } + + pIter->offset += nBytes; + pIter->nBytes = nBytes; + pIter->nBufPos = 0; + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { + return lenWithHead - sizeof(int32_t) - sizeof(int8_t) - sizeof(int64_t); +} + +static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd) { + + while (1) { + // block iter + bool isFinish = false; + if (tdRSmaQTaskInfoIterNextBlock(pIter, &isFinish) < 0) { + ASSERT(0); + return TSDB_CODE_FAILED; + } + if (isFinish) { + *isEnd = true; + return TSDB_CODE_SUCCESS; + } + + // consume the block + int32_t qTaskInfoLenWithHead = 0; + pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); + if(qTaskInfoLenWithHead < 0) { + terrno = TSDB_CODE_TDB_FILE_CORRUPTED; + return TSDB_CODE_FAILED; + } + while (1) { + if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) { + pIter->buf = taosDecodeFixedI8(pIter->buf, &pItem->type); + pIter->buf = taosDecodeFixedI64(pIter->buf, &pItem->suid); + pItem->qTaskInfo = pIter->buf; + pItem->len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); + // do the restore job + printf("%s:%d ###### restore the qtask info offset:%" PRIi64 "\n", __func__, __LINE__, pIter->offset); + + pIter->buf = POINTER_SHIFT(pIter->buf, pItem->len); + pIter->nBufPos += qTaskInfoLenWithHead; + + pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); + continue; + } + // prepare and load next block in the file + pIter->offset -= (pIter->nBytes - pIter->nBufPos); + break; + } + } + + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 85fddebf555cd812f6c31c5f2496abf76d97fffb..1f60da0b0a168c686347dcda8ebbba5fd643fef2 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -17,8 +17,6 @@ // smaFileUtil ================ -#define TD_FILE_HEAD_SIZE 512 - #define TD_FILE_STATE_OK 0 #define TD_FILE_STATE_BAD 1 @@ -71,6 +69,11 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { return loffset; } +int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { + ASSERT(TD_FILE_OPENED(pTFile)); + return taosFStatFile(pTFile->pFile, size, NULL); +} + int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { ASSERT(TD_FILE_OPENED(pTFile));