diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 4b42ab526308f1aac34a5c9b4ad6de87ba60dd94..63ac6397bd82df6dc7a76169e3eab8d768ac98cf 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -28,7 +28,8 @@ extern "C" { #define smaError(...) do { if (smaDebugFlag & DEBUG_ERROR) { taosPrintLog("SMA ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) #define smaWarn(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaInfo(...) do { if (smaDebugFlag & DEBUG_INFO) { taosPrintLog("SMA ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +// #define smaDebug(...) do { if (smaDebugFlag & DEBUG_DEBUG) { taosPrintLog("SMA ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define smaDebug(...) do { if (smaDebugFlag & DEBUG_WARN) { taosPrintLog("SMA WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on @@ -205,16 +206,16 @@ struct STFile { uint8_t state; }; -#define TD_FILE_F(tf) (&((tf)->f)) -#define TD_FILE_PFILE(tf) ((tf)->pFile) -#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) -#define TD_FILE_FULL_NAME(tf) (TD_FILE_F(tf)->aname) -#define TD_FILE_REL_NAME(tf) (TD_FILE_F(tf)->rname) -#define TD_FILE_OPENED(tf) (TD_FILE_PFILE(tf) != NULL) -#define TD_FILE_CLOSED(tf) (!TD_FILE_OPENED(tf)) -#define TD_FILE_SET_CLOSED(f) (TD_FILE_PFILE(f) = NULL) -#define TD_FILE_SET_STATE(tf, s) ((tf)->state = (s)) -#define TD_FILE_DID(tf) (TD_FILE_F(tf)->did) +#define TD_TFILE_F(tf) (&((tf)->f)) +#define TD_TFILE_PFILE(tf) ((tf)->pFile) +#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) +#define TD_TFILE_FULL_NAME(tf) (TD_TFILE_F(tf)->aname) +#define TD_TFILE_REL_NAME(tf) (TD_TFILE_F(tf)->rname) +#define TD_TFILE_OPENED(tf) (TD_TFILE_PFILE(tf) != NULL) +#define TD_TFILE_CLOSED(tf) (!TD_TFILE_OPENED(tf)) +#define TD_TFILE_SET_CLOSED(f) (TD_TFILE_PFILE(f) = NULL) +#define TD_TFILE_SET_STATE(tf, s) ((tf)->state = (s)) +#define TD_TFILE_DID(tf) (TD_TFILE_F(tf)->did) int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname); int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 68ed6dde5142e67693924afd2cdc7fa57b4e984e..baead763ad46a48196ebf62c1e12f21d004e9511 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -64,6 +64,7 @@ typedef struct STsdbSnapshotReader STsdbSnapshotReader; #define VNODE_TQ_DIR "tq" #define VNODE_WAL_DIR "wal" #define VNODE_TSMA_DIR "tsma" +#define VNODE_RSMA_DIR "rsma" #define VNODE_RSMA0_DIR "tsdb" #define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA2_DIR "rsma2" @@ -161,7 +162,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool // sma int32_t smaOpen(SVnode* pVnode); -int32_t smaClose(SSma* pSma); int32_t smaCloseEnv(SSma* pSma); int32_t smaCloseEx(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 88ed7426f7390d00d96d7559d1d7d3242142e00d..641b8c793433f683592a5031e0f17df0b56bb631 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -123,7 +123,7 @@ int32_t smaOpen(SVnode *pVnode) { } // restore the rsma -#if 0 +#if 1 if (rsmaRestore(pSma) < 0) { goto _err; } @@ -154,12 +154,6 @@ int32_t smaCloseEx(SSma *pSma) { return 0; } -int32_t smaClose(SSma *pSma) { - smaCloseEnv(pSma); - smaCloseEx(pSma); - return 0; -} - /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ed5b6f4055ac8c25ca0d2a821573857f4fbaf840..8423654eeee82a9e09662e12a192bb813ca33ad1 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,13 +15,14 @@ #include "sma.h" -#define RSMA_QTASKINFO_PERSIST_MS 7200000 +#define RSMA_QTASKINFO_PERSIST_MS 5000 // 7200000 #define RSMA_QTASKINFO_BUFSIZE 32768 +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid typedef enum { TD_QTASK_TMP_FILE = 0, TD_QTASK_CUR_FILE } TD_QTASK_FILE_T; static const char *tdQTaskInfoFname[] = {"qtaskinfo.t", "qtaskinfo"}; typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; -typedef struct SRSmaQTaskFIter SRSmaQTaskFIter; +typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); @@ -32,11 +33,11 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaPersistTrigger(void *param, void *tmrId); static void *tdRSmaPersistExec(void *param); -static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName); +static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName); -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile); -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish); -static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd); +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); +static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter); static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem); struct SRSmaInfoItem { @@ -63,22 +64,23 @@ struct SRSmaQTaskInfoItem { void *qTaskInfo; }; -struct SRSmaQTaskFIter { +struct SRSmaQTaskInfoIter { STFile *pTFile; int64_t offset; int64_t fsize; int32_t nBytes; int32_t nAlloc; - char *buf; + char *pBuf; // ------------ + char *qBuf; // for iterator int32_t nBufPos; }; static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { - return lenWithHead - sizeof(int32_t) - sizeof(int8_t) - sizeof(int64_t); + return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } -static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskFIter *pIter) { taosMemoryFreeClear(pIter->buf); } +static FORCE_INLINE void tdRSmaQTaskInfoIterDestroy(SRSmaQTaskInfoIter *pIter) { taosMemoryFreeClear(pIter->pBuf); } static FORCE_INLINE void tdFreeTaskHandle(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level) { // Note: free/kill may in RC @@ -294,7 +296,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo) { - ASSERT(0); // TODO: free original pRSmaInfo is exists abnormally + ASSERT(0); // TODO: free original pRSmaInfo if exists abnormally smaDebug("vgId:%d, rsma info already exists for table %s, %" PRIi64, SMA_VID(pSma), tbName, suid); return TSDB_CODE_SUCCESS; } @@ -338,10 +340,10 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; - } else { - smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); } + smaDebug("vgId:%d, register rsma info succeed for suid:%" PRIi64, SMA_VID(pSma), suid); + // start the persist timer if (TASK_TRIGGER_STAT_INIT == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_INIT, TASK_TRIGGER_STAT_ACTIVE)) { @@ -356,10 +358,9 @@ _err: } /** - * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. + * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam currently * - * @param pTsdb - * @param pMeta + * @param pVnode * @param pReq * @return int32_t */ @@ -695,331 +696,119 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { return TSDB_CODE_SUCCESS; } -static void tdRSmaQTaskGetFName(int32_t vid, int8_t ftype, char *outputName) { - tdGetVndFileName(vid, "rsma", tdQTaskInfoFname[ftype], outputName); -} - -static void *tdRSmaPersistExec(void *param) { - setThreadName("rsma-task-persist"); - SRSmaStat *pRSmaStat = param; - SSma *pSma = pRSmaStat->pSma; - STfs *pTfs = pSma->pVnode->pTfs; - int64_t toffset = 0; - bool isFileCreated = false; - - if (TASK_TRIGGER_STAT_CANCELLED == atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))) { - goto _end; - } - - void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - if (!infoHash) { - goto _end; - } - - STFile tFile = {0}; - int32_t vid = SMA_VID(pSma); - - while (infoHash) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - -#if 0 - smaDebug("table %" PRIi64 " sleep 15s start ...", pRSmaInfo->items[0].pRsmaInfo->suid); - for (int32_t i = 15; i > 0; --i) { - taosSsleep(1); - smaDebug("table %" PRIi64 " countdown %d", pRSmaInfo->items[0].pRsmaInfo->suid, i); - } - smaDebug("table %" PRIi64 " sleep 15s end ...", pRSmaInfo->items[0].pRsmaInfo->suid); -#endif - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; - if (!taskInfo) { - smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); - continue; - } - char *pOutput = NULL; - int32_t len = 0; - int8_t type = (int8_t)(i + 1); - if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { - smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, - terrstr(terrno)); - goto _err; - } else { - if (!pOutput) { - smaDebug("vgId:%d, table %" PRIi64 - " level %d serialize rsma task success but no output(len %d) and no need to persist", - vid, pRSmaInfo->suid, i + 1, len); - continue; - } else if (len <= 0) { - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and no need to persist", - vid, pRSmaInfo->suid, i + 1, len); - taosMemoryFree(pOutput); - } - smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d and need persist", vid, - pRSmaInfo->suid, i + 1, len); -#if 1 - if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { - smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, - i + 1, terrstr(terrno)); - } else { - smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); - } -#endif - } - - if (!isFileCreated) { - char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); - tdInitTFile(&tFile, pTfs, qTaskInfoFName); - tdCreateTFile(&tFile, pTfs, true, -1); - - isFileCreated = true; - } - len += (sizeof(len) + sizeof(type) + sizeof(pRSmaInfo->suid)); - tdAppendTFile(&tFile, &len, sizeof(len), &toffset); - tdAppendTFile(&tFile, &type, sizeof(type), &toffset); - tdAppendTFile(&tFile, &pRSmaInfo->suid, sizeof(pRSmaInfo->suid), &toffset); - tdAppendTFile(&tFile, pOutput, len, &toffset); - - taosMemoryFree(pOutput); - } - infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); - } -_normal: - if (isFileCreated) { - if (tdUpdateTFileHeader(&tFile) < 0) { - smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_FILE_FULL_NAME(&tFile), tstrerror(terrno)); - tdCloseTFile(&tFile); - tdRemoveTFile(&tFile); - goto _err; - } else { - smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_FILE_FULL_NAME(&tFile)); - } - - tdCloseTFile(&tFile); - - char newFName[TSDB_FILENAME_LEN]; - strncpy(newFName, TD_FILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); - char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); - strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); - if (taosRenameFile(TD_FILE_FULL_NAME(&tFile), newFName) != 0) { - smaError("vgId:%d, failed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName); - goto _err; - } else { - smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_FILE_FULL_NAME(&tFile), newFName); - } - } - goto _end; -_err: - if (isFileCreated) { - tdRemoveTFile(&tFile); - } -_end: - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("vgId:%d, persist task is active again", vid); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("vgId:%d, persist task is cancelled", vid); - } else { - smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - ASSERT(0); - } - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - taosThreadExit(NULL); - return NULL; -} - -static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { - TdThreadAttr thAttr; - taosThreadAttrInit(&thAttr); - taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); - TdThread tid; - - if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { - if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_INACTIVE, - TASK_TRIGGER_STAT_ACTIVE)) { - smaDebug("persist task is active again"); - } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug(" persist task is cancelled and set finished"); - } else { - smaWarn("persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); - ASSERT(0); - } - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - } - - taosThreadAttrDestroy(&thAttr); -} - -/** - * @brief trigger to persist rsma qTaskInfo - * - * @param param - * @param tmrId - */ -static void tdRSmaPersistTrigger(void *param, void *tmrId) { - SRSmaStat *pRSmaStat = param; - int8_t tmrStat = - atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); - switch (tmrStat) { - case TASK_TRIGGER_STAT_ACTIVE: { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1); - if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), - TASK_TRIGGER_STAT_CANCELLED, - TASK_TRIGGER_STAT_FINISHED)) { - smaDebug("rsma persistence start since active"); - - // start persist task - tdRSmaPersistTask(pRSmaStat); - - taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, - &pRSmaStat->tmrId); - } else { - atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - } - } break; - case TASK_TRIGGER_STAT_CANCELLED: { - atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); - smaDebug("rsma persistence not start since cancelled and finished"); - } break; - case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("rsma persistence not start since inactive"); - } break; - case TASK_TRIGGER_STAT_INIT: { - smaDebug("rsma persistence not start since init"); - } break; - default: { - smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); - ASSERT(0); - } break; - } -} - int32_t tdProcessRSmaRestoreImpl(SSma *pSma) { SVnode *pVnode = pSma->pVnode; // step 1: iterate all stables to restore the rsma env - SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { - smaError("vgId:%d, failed to restore rsma since get stb id list error: %s", TD_VID(pVnode), terrstr()); + taosArrayDestroy(suidList); + smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr()); return TSDB_CODE_FAILED; } - if (taosArrayGetSize(suidList) == 0) { - smaDebug("vgId:%d no need to restore rsma since empty stb id list", TD_VID(pVnode)); + int32_t arrSize = taosArrayGetSize(suidList); + if (arrSize == 0) { + taosArrayDestroy(suidList); + smaDebug("vgId:%d, no need to restore rsma env since empty stb id list", TD_VID(pVnode)); return TSDB_CODE_SUCCESS; } SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); - for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { + for (int32_t i = 0; i < arrSize; ++i) { tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - smaDebug("suid [%d] is %" PRIi64, i, suid); + smaDebug("vgId:%d, rsma restore, suid[%d] is %" PRIi64, TD_VID(pVnode), i, suid); if (metaGetTableEntryByUid(&mr, suid) < 0) { - smaError("vgId:%d failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); + smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, + terrstr()); goto _err; } ASSERT(mr.me.type == TSDB_SUPER_TABLE); ASSERT(mr.me.uid == suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { SRSmaParam *param = &mr.me.stbEntry.rsmaParam; - for (int i = 0; i < 2; ++i) { - smaDebug("vgId: %d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, TD_VID(pSma->pVnode), - suid, i, param->maxdelay[i], i, param->watermark[i]); + for (int i = 0; i < TSDB_RETENTION_L2; ++i) { + smaDebug("vgId:%d, rsma restore, table:%" PRIi64 " level:%d, maxdelay:%" PRIi64 " watermark:%" PRIi64 + " qmsgLen:%" PRIi32, + TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); } if (tdProcessRSmaCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { - smaError("vgId:%d failed to retore rsma env for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); + smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); goto _err; } + smaDebug("vgId:%d, rsma restore env success for %" PRIi64, TD_VID(pVnode), suid); } } - // step 2: retrieve qtaskinfo object from the rsma/qtaskinfo file and restore + // step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore STFile tFile = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); + tdRSmaQTaskInfoGetFName(TD_VID(pVnode), TD_QTASK_CUR_FILE, qTaskInfoFName); if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) { goto _err; } if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) { goto _err; } - SRSmaQTaskFIter fIter = {0}; + SRSmaQTaskInfoIter fIter = {0}; if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { goto _err; } SRSmaQTaskInfoItem infoItem = {0}; - bool isEnd = false; - int32_t code = 0; - while ((code = tdRSmaQTaskInfoIterNext(&fIter, &infoItem, &isEnd)) == 0) { - if (isEnd) { - break; - } - if ((code = tdRSmaQTaskInfoItemRestore(pSma, &infoItem)) < 0) { - break; - } - } - tdRSmaQTaskInfoIterDestroy(&fIter); - - if (code < 0) { + if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) { + tdRSmaQTaskInfoIterDestroy(&fIter); goto _err; } + tdRSmaQTaskInfoIterDestroy(&fIter); metaReaderClear(&mr); taosArrayDestroy(suidList); return TSDB_CODE_SUCCESS; _err: - ASSERT(0); metaReaderClear(&mr); taosArrayDestroy(suidList); - smaError("failed to restore rsma info since %s", terrstr()); + smaError("failed to restore rsma task since %s", terrstr()); return TSDB_CODE_FAILED; } -static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem) { +static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *pItem) { SRSmaStat *pStat = (SRSmaStat *)SMA_ENV_STAT((SSmaEnv *)pSma->pRSmaEnv); SRSmaInfo *pRSmaInfo = NULL; void *qTaskInfo = NULL; - pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &infoItem->suid, sizeof(infoItem->suid)); + pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &pItem->suid, sizeof(pItem->suid)); if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - smaDebug("vgId:%d, no restore as no rsma info for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); + smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; } - if (infoItem->type == 1) { + if (pItem->type == 1) { qTaskInfo = pRSmaInfo->items[0].taskInfo; - } else if (infoItem->type == 2) { + } else if (pItem->type == 2) { qTaskInfo = pRSmaInfo->items[1].taskInfo; } else { ASSERT(0); } if (!qTaskInfo) { - smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), infoItem->suid); + smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; } - if (qDeserializeTaskStatus(qTaskInfo, infoItem->qTaskInfo, infoItem->len) < 0) { - smaError("vgId:%d, restore rsma failed for suid:%" PRIi64 " level %d since %s", SMA_VID(pSma), infoItem->suid, - infoItem->type, terrstr(terrno)); + if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { + smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, + pItem->type, terrstr(terrno)); return TSDB_CODE_FAILED; } - smaDebug("vgId:%d, restore rsma success for suid:%" PRIi64 " level %d", SMA_VID(pSma), infoItem->suid, - infoItem->type); + smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, pItem->type); return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) { +static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile) { memset(pIter, 0, sizeof(*pIter)); pIter->pTFile = pTFile; pIter->offset = TD_FILE_HEAD_SIZE; @@ -1038,16 +827,17 @@ static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskFIter *pIter, STFile *pTFile) { pIter->nAlloc = TD_FILE_HEAD_SIZE; } - pIter->buf = taosMemoryMalloc(pIter->nAlloc); - if (!pIter->buf) { + pIter->pBuf = taosMemoryMalloc(pIter->nAlloc); + if (!pIter->pBuf) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } + pIter->qBuf = pIter->pBuf; return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFinish) { +static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish) { STFile *pTFile = pIter->pTFile; int64_t nBytes = RSMA_QTASKINFO_BUFSIZE; @@ -1065,22 +855,23 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini return TSDB_CODE_FAILED; } - if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { + if (tdReadTFile(pTFile, pIter->qBuf, nBytes) != nBytes) { ASSERT(0); return TSDB_CODE_FAILED; } int32_t infoLen = 0; - taosDecodeFixedI32(pIter->buf, &infoLen); + taosDecodeFixedI32(pIter->qBuf, &infoLen); if (infoLen > nBytes) { ASSERT(infoLen > RSMA_QTASKINFO_BUFSIZE); pIter->nAlloc = infoLen; - void *pBuf = taosMemoryRealloc(pIter->buf, infoLen); + void *pBuf = taosMemoryRealloc(pIter->pBuf, infoLen); if (!pBuf) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } - pIter->buf = pBuf; + pIter->pBuf = pBuf; + pIter->qBuf = pIter->pBuf; nBytes = infoLen; if (tdSeekTFile(pTFile, pIter->offset, SEEK_SET)) { @@ -1088,7 +879,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini return TSDB_CODE_FAILED; } - if (tdReadTFile(pTFile, pIter->buf, nBytes) != nBytes) { + if (tdReadTFile(pTFile, pIter->pBuf, nBytes) != nBytes) { ASSERT(0); return TSDB_CODE_FAILED; } @@ -1101,7 +892,7 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskFIter *pIter, bool *isFini return TSDB_CODE_SUCCESS; } -static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoItem *pItem, bool *isEnd) { +static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter) { while (1) { // block iter bool isFinish = false; @@ -1110,30 +901,39 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte return TSDB_CODE_FAILED; } if (isFinish) { - *isEnd = true; return TSDB_CODE_SUCCESS; } // consume the block int32_t qTaskInfoLenWithHead = 0; - pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); - if (qTaskInfoLenWithHead < 0) { + pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); + if (qTaskInfoLenWithHead < RSMA_QTASKINFO_HEAD_LEN) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return TSDB_CODE_FAILED; } + while (1) { if ((pIter->nBufPos + qTaskInfoLenWithHead) <= pIter->nBytes) { - pIter->buf = taosDecodeFixedI8(pIter->buf, &pItem->type); - pIter->buf = taosDecodeFixedI64(pIter->buf, &pItem->suid); - pItem->qTaskInfo = pIter->buf; - pItem->len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); + SRSmaQTaskInfoItem infoItem = {0}; + pIter->qBuf = taosDecodeFixedI8(pIter->qBuf, &infoItem.type); + pIter->qBuf = taosDecodeFixedI64(pIter->qBuf, &infoItem.suid); + infoItem.qTaskInfo = pIter->qBuf; + infoItem.len = tdRSmaQTaskInfoContLen(qTaskInfoLenWithHead); // do the restore job - printf("%s:%d ###### restore the qtask info offset:%" PRIi64 "\n", __func__, __LINE__, pIter->offset); + smaDebug("vgId:%d, restore the qtask info %s offset:%" PRIi64 "\n", SMA_VID(pSma), + TD_TFILE_FULL_NAME(pIter->pTFile), pIter->offset - pIter->nBytes + pIter->nBufPos); + tdRSmaQTaskInfoItemRestore(pSma, &infoItem); - pIter->buf = POINTER_SHIFT(pIter->buf, pItem->len); + pIter->qBuf = POINTER_SHIFT(pIter->qBuf, infoItem.len); pIter->nBufPos += qTaskInfoLenWithHead; - pIter->buf = taosDecodeFixedI32(pIter->buf, &qTaskInfoLenWithHead); + if ((pIter->nBufPos + RSMA_QTASKINFO_HEAD_LEN) >= pIter->nBytes) { + // prepare and load next block in the file + pIter->offset -= (pIter->nBytes - pIter->nBufPos); + break; + } + + pIter->qBuf = taosDecodeFixedI32(pIter->qBuf, &qTaskInfoLenWithHead); continue; } // prepare and load next block in the file @@ -1144,3 +944,208 @@ static int32_t tdRSmaQTaskInfoIterNext(SRSmaQTaskFIter *pIter, SRSmaQTaskInfoIte return TSDB_CODE_SUCCESS; } + +static void tdRSmaQTaskInfoGetFName(int32_t vid, int8_t ftype, char *outputName) { + tdGetVndFileName(vid, VNODE_RSMA_DIR, tdQTaskInfoFname[ftype], outputName); +} + +static void *tdRSmaPersistExec(void *param) { + setThreadName("rsma-task-persist"); + SRSmaStat *pRSmaStat = param; + SSma *pSma = pRSmaStat->pSma; + STfs *pTfs = pSma->pVnode->pTfs; + int32_t vid = SMA_VID(pSma); + int64_t toffset = 0; + bool isFileCreated = false; + + if (TASK_TRIGGER_STAT_CANCELLED == atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))) { + goto _end; + } + + void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); + if (!infoHash) { + goto _end; + } + + STFile tFile = {0}; + while (infoHash) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; + if (!taskInfo) { + smaDebug("vgId:%d, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); + continue; + } + + char *pOutput = NULL; + int32_t len = 0; + int8_t type = (int8_t)(i + 1); + if (qSerializeTaskStatus(taskInfo, &pOutput, &len) < 0) { + smaError("vgId:%d, table %" PRIi64 " level %d serialize rsma task failed since %s", vid, pRSmaInfo->suid, i + 1, + terrstr(terrno)); + goto _err; + } + if (!pOutput || len <= 0) { + smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success but no output(len %d), not persist", + vid, pRSmaInfo->suid, i + 1, len); + taosMemoryFreeClear(pOutput); + continue; + } + + smaDebug("vgId:%d, table %" PRIi64 " level %d serialize rsma task success with len %d, need persist", vid, + pRSmaInfo->suid, i + 1, len); +#if 0 + if (qDeserializeTaskStatus(taskInfo, pOutput, len) < 0) { + smaError("vgId:%d, table %" PRIi64 "level %d deserialize rsma task failed since %s", vid, pRSmaInfo->suid, + i + 1, terrstr(terrno)); + } else { + smaDebug("vgId:%d, table %" PRIi64 " level %d deserialize rsma task success", vid, pRSmaInfo->suid, i + 1); + } +#endif + + if (!isFileCreated) { + char qTaskInfoFName[TSDB_FILENAME_LEN]; + tdRSmaQTaskInfoGetFName(vid, TD_QTASK_TMP_FILE, qTaskInfoFName); + tdInitTFile(&tFile, pTfs, qTaskInfoFName); + tdCreateTFile(&tFile, pTfs, true, -1); + + isFileCreated = true; + } + + char tmpBuf[RSMA_QTASKINFO_HEAD_LEN] = {0}; + void *pTmpBuf = &tmpBuf; + int32_t headLen = 0; + headLen += taosEncodeFixedI32(&pTmpBuf, len + RSMA_QTASKINFO_HEAD_LEN); + headLen += taosEncodeFixedI8(&pTmpBuf, type); + headLen += taosEncodeFixedI64(&pTmpBuf, pRSmaInfo->suid); + + ASSERT(headLen <= RSMA_QTASKINFO_HEAD_LEN); + tdAppendTFile(&tFile, (void *)&tmpBuf, headLen, &toffset); + smaDebug("vgId:%d, table %" PRIi64 " level %d head part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, + i + 1, headLen, toffset); + tdAppendTFile(&tFile, pOutput, len, &toffset); + smaDebug("vgId:%d, table %" PRIi64 " level %d body part len:%d appended to offset:%" PRIi64, vid, pRSmaInfo->suid, + i + 1, len, toffset); + + taosMemoryFree(pOutput); + } + + infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), infoHash); + } +_normal: + if (isFileCreated) { + if (tdUpdateTFileHeader(&tFile) < 0) { + smaError("vgId:%d, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), + tstrerror(terrno)); + tdCloseTFile(&tFile); + tdRemoveTFile(&tFile); + goto _err; + } else { + smaDebug("vgId:%d, succeed to update tfile %s header", vid, TD_TFILE_FULL_NAME(&tFile)); + } + + tdCloseTFile(&tFile); + + char newFName[TSDB_FILENAME_LEN]; + strncpy(newFName, TD_TFILE_FULL_NAME(&tFile), TSDB_FILENAME_LEN); + char *pos = strstr(newFName, tdQTaskInfoFname[TD_QTASK_TMP_FILE]); + strncpy(pos, tdQTaskInfoFname[TD_QTASK_CUR_FILE], TSDB_FILENAME_LEN - POINTER_DISTANCE(pos, newFName)); + if (taosRenameFile(TD_TFILE_FULL_NAME(&tFile), newFName) != 0) { + smaError("vgId:%d, failed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + goto _err; + } else { + smaDebug("vgId:%d, succeed to rename %s to %s", vid, TD_TFILE_FULL_NAME(&tFile), newFName); + } + } + goto _end; +_err: + if (isFileCreated) { + tdRemoveTFile(&tFile); + } +_end: + if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), + TASK_TRIGGER_STAT_INACTIVE, + TASK_TRIGGER_STAT_ACTIVE)) { + smaDebug("vgId:%d, persist task is active again", vid); + } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), + TASK_TRIGGER_STAT_CANCELLED, + TASK_TRIGGER_STAT_FINISHED)) { + smaDebug("vgId:%d, persist task is cancelled", vid); + } else { + smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, vid, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); + ASSERT(0); + } + atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); + taosThreadExit(NULL); + return NULL; +} + +static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_DETACHED); + TdThread tid; + + if (taosThreadCreate(&tid, &thAttr, tdRSmaPersistExec, pRSmaStat) != 0) { + if (TASK_TRIGGER_STAT_INACTIVE == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), + TASK_TRIGGER_STAT_INACTIVE, + TASK_TRIGGER_STAT_ACTIVE)) { + smaDebug("vgId:%d, persist task is active again", SMA_VID(pRSmaStat->pSma)); + } else if (TASK_TRIGGER_STAT_CANCELLED == atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), + TASK_TRIGGER_STAT_CANCELLED, + TASK_TRIGGER_STAT_FINISHED)) { + smaDebug("vgId:%d, persist task is cancelled and set finished", SMA_VID(pRSmaStat->pSma)); + } else { + smaWarn("vgId:%d, persist task in abnormal stat %" PRIi8, atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)), + SMA_VID(pRSmaStat->pSma)); + ASSERT(0); + } + atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); + } + + taosThreadAttrDestroy(&thAttr); +} + +/** + * @brief trigger to persist rsma qTaskInfo + * + * @param param + * @param tmrId + */ +static void tdRSmaPersistTrigger(void *param, void *tmrId) { + SRSmaStat *pRSmaStat = param; + + int8_t tmrStat = + atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); + switch (tmrStat) { + case TASK_TRIGGER_STAT_ACTIVE: { + atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 1); + if (TASK_TRIGGER_STAT_CANCELLED != atomic_val_compare_exchange_8(RSMA_TRIGGER_STAT(pRSmaStat), + TASK_TRIGGER_STAT_CANCELLED, + TASK_TRIGGER_STAT_FINISHED)) { + smaDebug("vgId:%d, rsma persistence start since active", SMA_VID(pRSmaStat->pSma)); + + // start persist task + tdRSmaPersistTask(pRSmaStat); + + taosTmrReset(tdRSmaPersistTrigger, RSMA_QTASKINFO_PERSIST_MS, pRSmaStat, pRSmaStat->tmrHandle, + &pRSmaStat->tmrId); + } else { + atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); + } + } break; + case TASK_TRIGGER_STAT_CANCELLED: { + atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_FINISHED); + smaDebug("rsma persistence not start since cancelled and finished"); + } break; + case TASK_TRIGGER_STAT_INACTIVE: { + smaDebug("rsma persistence not start since inactive"); + } break; + case TASK_TRIGGER_STAT_INIT: { + smaDebug("rsma persistence not start since init"); + } break; + default: { + smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); + } break; + } +} diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 1f60da0b0a168c686347dcda8ebbba5fd643fef2..5f78aadddf6fdbb98d0efab356d02090a1b381f5 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -22,7 +22,6 @@ #define TD_FILE_INIT_MAGIC 0xFFFFFFFF - static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo); static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo); @@ -46,7 +45,7 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { } int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t nwrite = taosWriteFile(pTFile->pFile, buf, nbyte); if (nwrite < nbyte) { @@ -58,9 +57,9 @@ int64_t tdWriteTFile(STFile *pTFile, void *buf, int64_t nbyte) { } int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); - int64_t loffset = taosLSeekFile(TD_FILE_PFILE(pTFile), offset, whence); + int64_t loffset = taosLSeekFile(TD_TFILE_PFILE(pTFile), offset, whence); if (loffset < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -70,12 +69,12 @@ int64_t tdSeekTFile(STFile *pTFile, int64_t offset, int whence) { } int64_t tdGetTFileSize(STFile *pTFile, int64_t *size) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); return taosFStatFile(pTFile->pFile, size, NULL); } int64_t tdReadTFile(STFile *pTFile, void *buf, int64_t nbyte) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t nread = taosReadFile(pTFile->pFile, buf, nbyte); if (nread < 0) { @@ -108,7 +107,7 @@ int32_t tdLoadTFileHeader(STFile *pTFile, STFInfo *pInfo) { char buf[TD_FILE_HEAD_SIZE] = "\0"; uint32_t _version; - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); if (tdSeekTFile(pTFile, 0, SEEK_SET) < 0) { return -1; @@ -133,7 +132,7 @@ void tdUpdateTFileMagic(STFile *pTFile, void *pCksm) { } int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) { - ASSERT(TD_FILE_OPENED(pTFile)); + ASSERT(TD_TFILE_OPENED(pTFile)); int64_t toffset; @@ -141,6 +140,11 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) return -1; } +#if 1 + smaDebug("append to file %s, offset:%" PRIi64 " + nbyte:%" PRIi64 " =%" PRIi64, TD_TFILE_FULL_NAME(pTFile), toffset, + nbyte, toffset + nbyte); +#endif + ASSERT(pTFile->info.fsize == toffset); if (offset) { @@ -157,9 +161,9 @@ int64_t tdAppendTFile(STFile *pTFile, void *buf, int64_t nbyte, int64_t *offset) } int32_t tdOpenTFile(STFile *pTFile, int flags) { - ASSERT(!TD_FILE_OPENED(pTFile)); + ASSERT(!TD_TFILE_OPENED(pTFile)); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), flags); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), flags); if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -169,9 +173,9 @@ int32_t tdOpenTFile(STFile *pTFile, int flags) { } void tdCloseTFile(STFile *pTFile) { - if (TD_FILE_OPENED(pTFile)) { + if (TD_TFILE_OPENED(pTFile)) { taosCloseFile(&pTFile->pFile); - TD_FILE_SET_CLOSED(pTFile); + TD_TFILE_SET_CLOSED(pTFile); } } @@ -183,8 +187,8 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { char fullname[TSDB_FILENAME_LEN]; SDiskID did = {0}; - TD_FILE_SET_STATE(pTFile, TD_FILE_STATE_OK); - TD_FILE_SET_CLOSED(pTFile); + TD_TFILE_SET_STATE(pTFile, TD_FILE_STATE_OK); + TD_TFILE_SET_CLOSED(pTFile); memset(&(pTFile->info), 0, sizeof(pTFile->info)); pTFile->info.magic = TD_FILE_INIT_MAGIC; @@ -202,18 +206,18 @@ int32_t tdInitTFile(STFile *pTFile, STfs *pTfs, const char *fname) { int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fType) { ASSERT(pTFile->info.fsize == 0 && pTFile->info.magic == TD_FILE_INIT_MAGIC); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { if (errno == ENOENT) { // Try to create directory recursively - char *s = strdup(TD_FILE_REL_NAME(pTFile)); - if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_FILE_DID(pTFile)) < 0) { + char *s = strdup(TD_TFILE_REL_NAME(pTFile)); + if (tfsMkdirRecurAt(pTfs, taosDirName(s), TD_TFILE_DID(pTFile)) < 0) { taosMemoryFreeClear(s); return -1; } taosMemoryFreeClear(s); - pTFile->pFile = taosOpenFile(TD_FILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pTFile->pFile = taosOpenFile(TD_TFILE_FULL_NAME(pTFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pTFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -240,7 +244,7 @@ int32_t tdCreateTFile(STFile *pTFile, STfs *pTfs, bool updateHeader, int8_t fTyp return 0; } -int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_FILE_F(pTFile)); } +int32_t tdRemoveTFile(STFile *pTFile) { return tfsRemoveFile(TD_TFILE_F(pTFile)); } // smaXXXUtil ================ // ... \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 4d73dbc4062288e807a5c081a59b3c308dbc1d93..57d7386667a60939a8a199bb98a0f5119cda3987 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -152,14 +152,14 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { return pVnode; _err: - if (pVnode->pSma) smaClose(pVnode->pSma); + if (pVnode->pSma) smaCloseEnv(pVnode->pSma); if (pVnode->pQuery) vnodeQueryClose(pVnode); if (pVnode->pTq) tqClose(pVnode->pTq); if (pVnode->pWal) walClose(pVnode->pWal); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); + if (pVnode->pSma) smaCloseEx(pVnode->pSma); if (pVnode->pMeta) metaClose(pVnode->pMeta); - tsem_destroy(&(pVnode->canCommit)); taosMemoryFree(pVnode); return NULL;